[GitHub] apex-malhar pull request #388: Tim serde interface

2016-08-28 Thread ilooner
Github user ilooner closed the pull request at:

https://github.com/apache/apex-malhar/pull/388


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #388: Tim serde interface

2016-08-28 Thread ilooner
GitHub user ilooner opened a pull request:

https://github.com/apache/apex-malhar/pull/388

Tim serde interface



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/simplifi-it/kelp tim-serdeInterface

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #388


commit 3897e59b2f4bfc6958c14725d27dccf238f4fb5e
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-01T07:37:07Z

APEXMALHAR-2063 Made window data manager use file system wal

commit ac6873f6663f86f94050799ed241a702a8ae3573
Author: ilooner <timothytiborfar...@gmail.com>
Date:   2016-08-08T04:17:13Z

Merge pull request #1 from chandnisingh/APEXMALHAR-2063

APEXMALHAR-2063 Made window data manager use file system wal

commit d54b2de283ce829dfa7756295f782a451f1e3472
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-17T21:32:34Z

Added implementations of SpillableList, SpillableMap, and 
SpillableArrayListMultimap

commit f32c2eb3ac2b2067efefbb182cff6d5744b16b10
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-08-08T06:18:42Z

 - Deleted obsolete files
 - Ignored failing Spillable tests

commit 960dcc0b2a09b91bd2d67e1c277033bdf3f9a369
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-08T07:54:57Z

Added kelp modules

commit b95dcd64e308900d87aa7ed6f8690c31f8070f13
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-09T01:46:51Z

APEXMALHAR-2063 IncrementalCheckpointManager was using old api

commit 1a140d3a89cb20a49cccaefa98e8d73bfbe5c577
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-09T02:47:25Z

Deleting unnecesary code

commit b54aebcab53627757fc1405a499ee1de3b150f00
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-09T03:21:03Z

KELP-1: On local fs, output stream needs to closed to consume data

commit 28be4dcb9bfc8b74bb49915ca45024439b6a712d
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-09T03:35:56Z

Merge branch 'master' of github.com:simplifi-it/kelp

commit e14ab2870a822c488c3c7812cbd87e3b409ccd70
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-15T01:37:19Z

Removed unnecessary operators

commit 415c657ef3094744d4a5b035e039b1a624f0e6c5
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-15T02:43:04Z

Deleted uneeded operators and modules.

commit e4327ad2d3a1e6355ac8d2e44e448ae49707d44c
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-21T18:58:48Z

Deleted uneccessary operators and Added FieldDescriptorSerializer

commit 2fef8276a8ef70699d61fca1ce8a97a6017e6b9a
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-21T19:33:40Z

Made bytesList transient

commit 1d6d1a31ddd32a90b4749b4ba92150d19b861803
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-21T22:44:15Z

Fix bug where incorrect offset will be set on mutable int

commit 4fbdedcd95ad5945478dc8cb21c1adf5775c6ff9
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-25T07:30:10Z

Compile the kafka module by default

commit d4fe8bb0df203161dce516f1bfb28db6dfec5b74
Author: Chandni Singh <csi...@apache.org>
Date:   2016-08-27T03:16:17Z

APEXMALHAR-2063 Made window data manager use file system wal

commit ed16071bf959211a6636b337d4ea070bdd32e908
Author: Timothy Farkas <t...@simplifi.it>
Date:   2016-08-28T22:37:27Z

Added Serializer and Deserializer interfaces




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...

2016-08-10 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/322#discussion_r74370558
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java 
---
@@ -51,291 +66,607 @@
 public class FSWindowDataManager implements WindowDataManager
 {
   private static final String DEF_RECOVERY_PATH = "idempotentState";
-
-  protected transient FSStorageAgent storageAgent;
+  private static final String WAL_FILE_NAME = "wal";
 
   /**
-   * Recovery path relative to app path where state is saved.
+   * Recovery filePath relative to app filePath where state is saved.
*/
   @NotNull
-  private String recoveryPath;
+  private String recoveryPath = DEF_RECOVERY_PATH;
 
   private boolean isRecoveryPathRelativeToAppPath = true;
 
   /**
-   * largest window for which there is recovery data across all physical 
operator instances.
+   * This is not null only for one physical instance.
+   * It consists of operator ids which have been deleted but have some 
state that can be replayed.
+   * Only one of the instances would be handling (modifying) the files 
that belong to this state. 
+   * The value is assigned during partitioning.
*/
-  protected transient long largestRecoveryWindow;
+  private Set deletedOperators;
+
+  private boolean repartitioned;
 
   /**
-   * This is not null only for one physical instance.
-   * It consists of operator ids which have been deleted but have some 
state that can be replayed.
-   * Only one of the instances would be handling (modifying) the files 
that belong to this state.
+   * Used when it is not necessary to replay every streaming/app window.
+   * Used by {@link IncrementalCheckpointManager}
*/
-  protected Set deletedOperators;
+  private boolean relyOnCheckpoints;
 
   /**
-   * Sorted mapping from window id to all the operators that have state to 
replay for that window.
+   * largest window for which there is recovery data across all physical 
operator instances.
*/
-  protected final transient TreeMultimap<Long, Integer> replayState;
+  private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
+
+  private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
 
-  protected transient FileSystem fs;
-  protected transient Path appPath;
+  //operator id -> wals (sorted)
+  private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = 
new HashMap<>();
+
+  private transient String statePath;
+  private transient int operatorId;
+
+  private final transient Kryo kryo = new Kryo();
+
+  private transient FileContext fileContext;
 
   public FSWindowDataManager()
   {
-replayState = TreeMultimap.create();
-largestRecoveryWindow = Stateless.WINDOW_ID;
-recoveryPath = DEF_RECOVERY_PATH;
+kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
--- End diff --

for my learning :) why is this needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-07 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r73809225
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+@DefaultSerializer(FieldSerializer.class)
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache<K, SpillableArrayListImpl> 
cache = new WindowBoundedMapCache<>();
+  private transient boolean isRunning = false;
+  private transient boolean isInWindow = false;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  private SpillableByteArrayListMultimapImpl()
+  {
+// for kryo
+  }
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  public SpillableStateStore getStore()
+  {
+return store;
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keySlice = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keySlice, 
SIZE_KEY_SUFFIX).toByteArray());
--- End diff --

@davidyan74 no the Spillable map prefixes each key with its identifier 
internally. It is not necessary for you to manually prefix the keys yourself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-08-03 Thread ilooner
GitHub user ilooner reopened a pull request:

https://github.com/apache/apex-malhar/pull/324

Spillable Datastructures PR for review only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilooner/incubator-apex-malhar 
APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-17T21:32:34Z

Added implementations of SpillableList, SpillableMap, and 
SpillableArrayListMultimap

commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-18T01:56:41Z

Added SpillableComplexComponentImpl

commit 9f17b4ba9233e3f46746505941a378236193f719
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-18T03:29:07Z

Added propagating callbacks to store

commit fe41f0c20235aac3ca57facc2491ecfba11d20a7
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-21T06:38:48Z

Added checkpoint callbacks to spillable complex components
Added some half completed tests

commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T04:09:54Z

Finished unit test for SpillableArrayListMultimap

commit dc258b8900688264f349307737b59b096dbc3d2b
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T05:19:37Z

Added unit test which uses managed state

commit ed9924b810a76c39404701da81f4753ab68af5a5
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T06:55:18Z

Finished adding managed state tests for SpillableByteMap

commit 43da17d9633dc2405b596b25697b6b4b0baef69f
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T16:31:53Z

Added ManagedStateTests For SpillableArrayList

commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T16:49:00Z

Added managed state tests for SpillableArrayListMultimap

commit 57c4e5e3c3e019613f31753be5052c32ba762e53
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-24T16:57:35Z

Added ManagedStateTest for SpillableComplexComponent

commit ebc822c4ba8752f464395cf01221addc6d34
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-27T06:38:51Z

Fixed broken containsKey method and added test

commit 983924215af2af7b683a3b654c320b7026b3165d
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-28T17:27:01Z

isolated unit test for spillablebytemapimpl recovery

commit 68ca96332dce24aa739e547ba87058dc35bb56df
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-29T08:10:28Z

Testing

commit a7796d61d80111faa24543ddc05b17c2944138be
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-07-31T18:44:23Z

Added recovery test for SpillableArrayListMultimap and added license headers

commit b3b4aacf44127c79553fc289f1f9e1f7c17b2ec6
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-08-01T03:34:18Z

 - Fixed bug
 - Added more recovery tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

2016-08-03 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r73450775
  
--- Diff: 
engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import javax.validation.constraints.NotNull;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.api.OperatorDeployInfo;
+
+public class OperatorContextTest
+{
+  private static boolean foundOperatorName;
+
+  @Test
+  public void testInjectionOfOperatorName() throws Exception
+  {
+final LocalMode lma = LocalMode.newInstance();
+StreamingApplication testApp = new StreamingApplication()
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+MockInputOperator input = dag.addOperator("input", new 
MockInputOperator());
+GenericNodeTest.GenericOperator output = dag.addOperator("output", 
new GenericNodeTest.GenericOperator());
--- End diff --

Critical catch. nice job @gauravgopi123 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-31 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72913406
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
 ---
@@ -0,0 +1,458 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by tfarkas on 6/19/16.
+ */
+public class SpillableArrayListImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleAddGetAndSetTest1()
+  {
+InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+simpleAddGetAndSetTest1Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest1()
+  {
+simpleAddGetAndSetTest1Helper(testMeta.store);
+  }
+
+  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  {
+SpillableArrayListImpl list = new SpillableArrayListImpl<>(0L, 
ID1, store,
+new SerdeStringSlice(), 1);
+
+store.setup(testMeta.operatorContext);
+list.setup(testMeta.operatorContext);
+
+long windowId = 0L;
+store.beginWindow(windowId);
+list.beginWindow(windowId);
+windowId++;
--- End diff --

Yes will make change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

2016-07-31 Thread ilooner
Github user ilooner commented on the issue:

https://github.com/apache/apex-core/pull/364
  
@chandnisingh lgtm once unit test is updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

2016-07-31 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r72900235
  
--- Diff: 
engine/src/test/java/com/datatorrent/stram/engine/OperatorContextTest.java ---
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+
+public class OperatorContextTest
+{
+
+  @Test
+  public void testInjectionOfOperatorName() throws Exception
+  {
+final LocalMode lma = LocalMode.newInstance();
+final CountDownLatch latch = new CountDownLatch(1);
+StreamingApplication testApp = new StreamingApplication()
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+MockInputOperator input = dag.addOperator("input", new 
MockInputOperator());
+input.countDownLatch = latch;
+GenericNodeTest.GenericOperator output = dag.addOperator("output", 
new GenericNodeTest.GenericOperator());
+
+dag.addStream("stream", input.output, output.ip1);
+  }
+};
+
+lma.prepareDAG(testApp, new Configuration());
+LocalMode.Controller lc = lma.getController();
+lc.runAsync();
+latch.await();
+lc.shutdown();
+  }
+
+  private static class MockInputOperator extends BaseOperator implements 
InputOperator, AutoMetric.Aggregator
+  {
+private transient CountDownLatch countDownLatch;
+
+@AutoMetric
+String operatorName;
+
+public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+@Override
+public void setup(Context.OperatorContext context)
+{
+  operatorName = Preconditions.checkNotNull(context.getOperatorName(), 
"operator name");
+}
+
+@Override
+public void emitTuples()
+{
+}
+
+@Override
+public Map<String, Object> aggregate(long windowId, 
Collection physicalMetrics)
+{
+  String name = 
(String)physicalMetrics.iterator().next().getMetrics().get("operatorName");
+  Assert.assertEquals("operator name", "input", name);
--- End diff --

Could we just set a static variable and check it in the test instead of 
using a latch and the assert?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

2016-07-27 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/345#discussion_r72389548
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
 ---
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of WindowedPlainStorage that makes use of 
{@link Spillable} data structures
+ *
+ * @param  Type of the value per window
+ */
+public class SpillableWindowedPlainStorage implements 
WindowedStorage.WindowedPlainStorage
+{
+  private SpillableStateStore store;
+  private transient SpillableComplexComponentImpl sccImpl;
+  private long bucket;
+  private Serde<Window, Slice> windowSerde;
+  private Serde<T, Slice> valueSerde;
+
+  protected transient Spillable.SpillableByteMap<Window, T> internMap;
+
+  public SpillableWindowedPlainStorage()
+  {
+  }
+
+  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> 
windowSerde, Serde<T, Slice> valueSerde)
+  {
+this.bucket = bucket;
+this.windowSerde = windowSerde;
+this.valueSerde = valueSerde;
+  }
+
+  public void setStore(SpillableStateStore store)
+  {
+this.store = store;
+  }
+
+  public void setBucket(long bucket)
+  {
+this.bucket = bucket;
+  }
+
+  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  {
+this.windowSerde = windowSerde;
+  }
+
+  public void setValueSerde(Serde<T, Slice> valueSerde)
+  {
+this.valueSerde = valueSerde;
+  }
+
+  @Override
+  public void put(Window window, T value)
+  {
+internMap.put(window, value);
+  }
+
+  @Override
+  public T get(Window window)
+  {
+return internMap.get(window);
+  }
+
+  @Override
+  public Iterable<Map.Entry<Window, T>> entrySet()
+  {
+return internMap.entrySet();
+  }
+
+  @Override
+  public Iterator<Map.Entry<Window, T>> iterator()
+  {
+return internMap.entrySet().iterator();
+  }
+
+  @Override
+  public boolean containsWindow(Window window)
+  {
+return internMap.containsKey(window);
+  }
+
+  @Override
+  public long size()
+  {
+return internMap.size();
+  }
+
+  @Override
+  public void remove(Window window)
+  {
+internMap.remove(window);
+  }
+
+  @Override
+  public void migrateWindow(Window fromWindow, Window toWindow)
+  {
+internMap.put(toWindow, internMap.remove(fromWindow));
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+if (store == null) {
+  // provide a default store
+  store = new ManagedStateSpillableStateStore();
+}
+if (bucket == 0) {
+  // choose a bucket that is almost guaranteed to be unique
+  bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + 
"#" + context.getId()).hashCode();
+}
+// se

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-07-25 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r72124180
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl<K, V> implements 
Spillable.SpillableByteArrayListMultimap<K, V>,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache<K, SpillableArrayListImpl> cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  private boolean isRunning = false;
+  private boolean isInWindow = false;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde<K, Slice> serdeKey,
+  Serde<V, Slice> serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
--- End diff --

No it shouldn't. That error existed before but I fixed it with my latest 
commits. Do you have the latest? If so I'll check tonight to see if I can 
reproduce it in my tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...

2016-07-24 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/322#discussion_r71995236
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java ---
@@ -126,36 +146,24 @@ public void teardown()
 }
 
 @Override
-public void save(Object object, int operatorId, long windowId) throws 
IOException
+public void save(Object object, long windowId) throws IOException
 {
 }
 
 @Override
-public Object load(int operatorId, long windowId) throws IOException
+public Object retrieve(long windowId) throws IOException
 {
   return null;
 }
 
 @Override
-public void delete(int operatorId, long windowId) throws IOException
-{
-}
-
-@Override
-public void deleteUpTo(int operatorId, long windowId) throws 
IOException
--- End diff --

These are no longer needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #322: APEXMALHAR-2063 Made window data manager use ...

2016-07-24 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/322#discussion_r71995041
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java ---
@@ -41,15 +41,42 @@
  *
  * @since 2.0.0
  */
-public interface WindowDataManager extends StorageAgent, 
Component
+public interface WindowDataManager extends 
Component
 {
   /**
+   * Save the state for a window id.
+   * @param objectstate
+   * @param windowId  window id
+   * @throws IOException
+   */
+  void save(Object object, long windowId) throws IOException;
+
+  /**
+   * Gets the object saved for the provided window id. 
+   * Typically it is used to replay tuples of successive windows in input 
operators after failure.
+   *
+   * @param windowId window id
+   * @return saved state for the window id.
+   * @throws IOException
+   */
+  Object retrieve(long windowId) throws IOException;
+
+  /**
+   * Delete the artifact corresponding to the
--- End diff --

complete javadoc here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

2016-07-17 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/345#discussion_r71096442
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
 ---
@@ -0,0 +1,135 @@
+package org.apache.apex.malhar.lib.window.impl;
--- End diff --

Same comments above would apply here as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

2016-07-17 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/345#discussion_r71095327
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
 ---
@@ -0,0 +1,136 @@
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.concurrent.Immutable;
+import javax.validation.constraints.NotNull;
+
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
+import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by david on 7/15/16.
+ */
+public class SpillableWindowedKeyedStorage<K, V> implements 
WindowedStorage.WindowedKeyedStorage<K, V>
+{
+  @NotNull
+  private final ManagedStateSpillableStateStore store;
+
+  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> 
internValues;
+  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
+
+  private SpillableWindowedKeyedStorage()
+  {
+// for kryo
+store = null;
+internValues = null;
+internKeys = null;
+  }
+
+  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, 
Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, 
Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> 
serdeValue)
+  {
+store = new ManagedStateSpillableStateStore();
+store.getCheckpointManager().setNeedBucketFile(false);
+internValues = new SpillableByteMapImpl<>(store, identifier, bucket, 
serdeWindowKey, serdeValue);
+internKeys = new SpillableByteArrayListMultimapImpl<>(store, 
identifier, bucket, serdeWindow, serdeKey);
+  }
+
+  @Override
+  public boolean containsWindow(Window window)
+  {
+return internKeys.containsKey(window);
+  }
+
+  @Override
+  public long size()
+  {
+return internKeys.size();
+  }
+
+  @Override
+  public void remove(Window window)
+  {
+List keys = internKeys.get(window);
+for (K key : keys) {
+  internValues.remove(new ImmutablePair<>(window, key));
+}
+internKeys.removeAll(window);
+  }
+
+  @Override
+  public void migrateWindow(Window fromWindow, Window toWindow)
+  {
+List keys = internKeys.get(fromWindow);
+internValues.remove(toWindow);
+for (K key : keys) {
+  internKeys.put(toWindow, key);
+  ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, 
key);
+  ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
+
+  V value = internValues.get(oldKey);
+  internValues.remove(oldKey);
+  internValues.put(newKey, value);
+}
+internKeys.removeAll(fromWindow);
+  }
+
+  @Override
+  public void beginApexWindow(long windowId)
+  {
+store.beginWindow(windowId);
--- End diff --

spillableComplexComponent.beginWindow(windowId)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

2016-07-17 Thread ilooner
Github user ilooner commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/345#discussion_r71095257
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
 ---
@@ -0,0 +1,136 @@
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.concurrent.Immutable;
+import javax.validation.constraints.NotNull;
+
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
+import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by david on 7/15/16.
+ */
+public class SpillableWindowedKeyedStorage<K, V> implements 
WindowedStorage.WindowedKeyedStorage<K, V>
+{
+  @NotNull
+  private final ManagedStateSpillableStateStore store;
--- End diff --

The field here should be of type SpillableComplexComponent, that way 
ManagedState isn't hardcoded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-21 Thread ilooner
GitHub user ilooner opened a pull request:

https://github.com/apache/apex-malhar/pull/324

Spillable Datastructures PR for review only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilooner/incubator-apex-malhar 
APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-06-05T00:11:20Z

- Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-06-13T06:03:21Z

Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas <t...@datatorrent.com>
Date:   2016-06-21T06:58:09Z

Intermediate commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---