[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405344#comment-15405344
 ] 

ASF GitHub Bot commented on APEXCORE-448:
-

Github user chandnisingh commented on the issue:

https://github.com/apache/apex-core/pull/364
  
Changed the method to ```getName()```.


> Make operator name available in OperatorContext
> ---
>
> Key: APEXCORE-448
> URL: https://issues.apache.org/jira/browse/APEXCORE-448
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by 
> WindowDataManager to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...

2016-08-02 Thread chandnisingh
Github user chandnisingh commented on the issue:

https://github.com/apache/apex-core/pull/364
  
Changed the method to ```getName()```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282037
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   * @return
+   */
+  @Override
+  public long extractTime(Object tuple, boolean isStream1Data)
+  {
+return timeFields == null ? time : (long)(isStream1Data ? 
inputFieldObjects[0].timeFieldGet.get(tuple) :

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405335#comment-15405335
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282003
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+public class ManagedTimeStateMultiValue implements 
Spillable.SpillableByteArrayListMultimap
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isPrimaryKey)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isPrimaryKey;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (isKeyContainsMultiValue) {
+  value = (List)streamCodec.fromByteArray(valueSlice);
+}  else {
+  if (valueSlice == null || valueSlice.length == 0 || 
valueSlice.buffer == null) {
+return null;
+  }
+  value = new ArrayList<>();
+  value.add((V)streamCodec.fromByteArray(valueSlice));
+}
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new 

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405336#comment-15405336
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282037
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
+}
+  }
+
+  /**
+   * Extract the time value from the given tuple
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73282003
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
 ---
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Concrete implementation of SpillableByteArrayListMultimap which is 
needed for join operator.
+ *
+ * Properties:
+ * isKeyContainsMultiValue: Specifies whether the key has multiple 
value or not. 
+ * timeBucket: Specifies the lenght of the time bucket.
+ *
+ */
+public class ManagedTimeStateMultiValue implements 
Spillable.SpillableByteArrayListMultimap
+{
+  private transient StreamCodec streamCodec = null;
+  private boolean isKeyContainsMultiValue = false;
+  private long timeBucket;
+  @NotNull
+  private ManagedTimeStateImpl store;
+
+  public ManagedTimeStateMultiValue()
+  {
+if (streamCodec == null) {
+  streamCodec = new KryoSerializableStreamCodec();
+}
+  }
+
+  public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, 
boolean isPrimaryKey)
+  {
+this();
+this.store = Preconditions.checkNotNull(store);
+this.isKeyContainsMultiValue = isPrimaryKey;
+  }
+
+  /**
+   * Return the list of values from the store
+   * @param k given key
+   * @return list of values
+   */
+  @Override
+  public List get(@Nullable K k)
+  {
+List value = null;
+Slice valueSlice = store.getSync(getBucketId(k), 
streamCodec.toByteArray(k));
+if (isKeyContainsMultiValue) {
+  value = (List)streamCodec.fromByteArray(valueSlice);
+}  else {
+  if (valueSlice == null || valueSlice.length == 0 || 
valueSlice.buffer == null) {
+return null;
+  }
+  value = new ArrayList<>();
+  value.add((V)streamCodec.fromByteArray(valueSlice));
+}
+return  value;
+  }
+
+  /**
+   * Returns the Future form the store.
+   * @param k given key
+   * @return
+   */
+  public CompositeFuture getAsync(@Nullable K k)
+  {
+return new CompositeFuture(store.getAsync(getBucketId(k), 
streamCodec.toByteArray(k)));
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+
+  }
+
+  @Override
+  public int size()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
 

[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405334#comment-15405334
 ] 

ASF GitHub Bot commented on APEXCORE-448:
-

Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r73281978
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
@@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
* @param attributes the value of attributes
* @param parentContext
*/
-  public OperatorContext(int id, AttributeMap attributes, Context 
parentContext)
+  public OperatorContext(int id, String operatorName, AttributeMap 
attributes, Context parentContext)
   {
 super(attributes, parentContext);
 this.lastProcessedWindowId = Stateless.WINDOW_ID;
 this.id = id;
 this.stateless = super.getValue(OperatorContext.STATELESS);
+this.operatorName = Preconditions.checkNotNull(operatorName, "operator 
name");
--- End diff --

I checked locally in an Application test that empty operator name is 
allowed in the DAG. I don't think we can restrict it here if adding empty 
operator name is allowed in the DAG.


> Make operator name available in OperatorContext
> ---
>
> Key: APEXCORE-448
> URL: https://issues.apache.org/jira/browse/APEXCORE-448
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by 
> WindowDataManager to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

2016-08-02 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r73281978
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
@@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
* @param attributes the value of attributes
* @param parentContext
*/
-  public OperatorContext(int id, AttributeMap attributes, Context 
parentContext)
+  public OperatorContext(int id, String operatorName, AttributeMap 
attributes, Context parentContext)
   {
 super(attributes, parentContext);
 this.lastProcessedWindowId = Stateless.WINDOW_ID;
 this.id = id;
 this.stateless = super.getValue(OperatorContext.STATELESS);
+this.operatorName = Preconditions.checkNotNull(operatorName, "operator 
name");
--- End diff --

I checked locally in an Application test that empty operator name is 
allowed in the DAG. I don't think we can restrict it here if adding empty 
operator name is allowed in the DAG.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405331#comment-15405331
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281797
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
--- End diff --

Ok. Will make expiryTime as mandatory. 
I thought of different use case and this is invalid in streaming scenario. 
I will make single expiry time for both the streams.


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281797
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java
 ---
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+
+/**
+ * An abstract implementation of inner join operator over Managed state 
which extends from
+ * AbstractInnerJoinOperator.
+ *
+ * Properties:
+ * noOfBuckets: Number of buckets required for Managed state. 
+ * bucketSpanTime: Indicates the length of the time bucket. 
+ */
+public abstract class AbstractManagedStateInnerJoinOperator extends 
AbstractInnerJoinOperator implements
+Operator.CheckpointNotificationListener, 
Operator.CheckpointListener,Operator.IdleTimeHandler
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class);
+  public static final String stateDir = "managedState";
+  public static final String stream1State = "stream1Data";
+  public static final String stream2State = "stream2Data";
+  private transient long sleepMillis;
+  private transient Map, Future> waitingEvents = 
Maps.newLinkedHashMap();
+  private int noOfBuckets = 1;
+  private Long bucketSpanTime;
+  protected ManagedTimeStateImpl stream1Store;
+  protected ManagedTimeStateImpl stream2Store;
+
+  /**
+   * Create Managed states and stores for both the streams.
+   */
+  @Override
+  public void createStores()
+  {
+stream1Store = new ManagedTimeStateImpl();
+stream2Store = new ManagedTimeStateImpl();
+stream1Store.setNumBuckets(noOfBuckets);
+stream2Store.setNumBuckets(noOfBuckets);
+if (bucketSpanTime != null) {
--- End diff --

Ok. Will make expiryTime as mandatory. 
I thought of different use case and this is invalid in streaming scenario. 
I will make single expiry time for both the streams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405330#comment-15405330
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281761
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
--- End diff --

No. In declaration, it creates the array with Map type.


> Development of Inner Join Operator using Spillable Datastructures
> 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73281761
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and 
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends 
AbstractManagedStateInnerJoinOperator implements 
Operator.ActivationListener
+{
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+  private transient long timeIncrement;
+  private transient FieldObjectMap[] inputFieldObjects = 
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+  protected Class outputClass;
+  private long time = System.currentTimeMillis();
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort outputPort = new 
DefaultOutputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input1 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[0].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,true);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(true);
+}
+  };
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort input2 = new 
DefaultInputPort()
+  {
+@Override
+public void setup(Context.PortContext context)
+{
+  inputFieldObjects[1].inputClass = 
context.getValue(Context.PortContext.TUPLE_CLASS);
+}
+
+@Override
+public void process(Object tuple)
+{
+  processTuple(tuple,false);
+}
+
+@Override
+public StreamCodec getStreamCodec()
+{
+  return getInnerJoinStreamCodec(false);
+}
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+timeIncrement = 
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+  context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+super.setup(context);
+for (int i = 0; i < 2; i++) {
+  inputFieldObjects[i] = new FieldObjectMap();
--- End diff --

No. In declaration, it creates the array with Map type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...

2016-08-02 Thread chandnisingh
Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r73281443
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
@@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
* @param attributes the value of attributes
* @param parentContext
*/
-  public OperatorContext(int id, AttributeMap attributes, Context 
parentContext)
+  public OperatorContext(int id, String operatorName, AttributeMap 
attributes, Context parentContext)
--- End diff --

A lot of tests create OperatorContext instance without OperatorDeployInfo. 
OperatorDeployInfo does not expose methods to set the fields - id, name, 
attributes.
So if we change this it will cause lot more changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405327#comment-15405327
 ] 

ASF GitHub Bot commented on APEXCORE-448:
-

Github user chandnisingh commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/364#discussion_r73281443
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java ---
@@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout)
* @param attributes the value of attributes
* @param parentContext
*/
-  public OperatorContext(int id, AttributeMap attributes, Context 
parentContext)
+  public OperatorContext(int id, String operatorName, AttributeMap 
attributes, Context parentContext)
--- End diff --

A lot of tests create OperatorContext instance without OperatorDeployInfo. 
OperatorDeployInfo does not expose methods to set the fields - id, name, 
attributes.
So if we change this it will cause lot more changes.


> Make operator name available in OperatorContext
> ---
>
> Key: APEXCORE-448
> URL: https://issues.apache.org/jira/browse/APEXCORE-448
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need name of the logical operator in the OperatorContext which can be used by 
> WindowDataManager to create a unique path per logical operator .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXCORE-500) Allow specification of non-heap memory size

2016-08-02 Thread Pramod Immaneni (JIRA)
Pramod Immaneni created APEXCORE-500:


 Summary: Allow specification of non-heap memory size
 Key: APEXCORE-500
 URL: https://issues.apache.org/jira/browse/APEXCORE-500
 Project: Apache Apex Core
  Issue Type: Improvement
Reporter: Pramod Immaneni
Assignee: Pramod Immaneni


Currently there are two mechanisms affecting the non-heap size none of which 
are direct. By default, it is 0.25 of the container size but max size limited 
to 1GB. The other way is when the heap memory is explicitly specified to a 
fixed value the remaining memory is available for non-heap. 

If a specific size is desired for non-heap that is greater than the default 
then they today the heap size has to be specified for each container. This size 
could change from container to container based on the operator it contains and 
the user has to calculate and set these heap sizes. Instead, it will be easier 
to have an option to directly specify one non-heap size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2173) add a constraint validation for subject in JMSBase

2016-08-02 Thread Sanjay M Pujare (JIRA)
Sanjay M Pujare created APEXMALHAR-2173:
---

 Summary: add a constraint validation for subject in JMSBase
 Key: APEXMALHAR-2173
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2173
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: Sanjay M Pujare
Priority: Minor


In com/datatorrent/lib/io/jms/JMSBase add NotNull constraint check for the 
member "subject" 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: custom JAVA_HOME

2016-08-02 Thread Pramod Immaneni
How about allowing specification of all environment variables supported by
YARN that are non-final described below

http://atetric.com/atetric/javadoc/org.apache.hadoop/hadoop-yarn-api/0.23.3/org/apache/hadoop/yarn/api/ApplicationConstants.Environment.html

Thanks

On Tue, Aug 2, 2016 at 3:43 PM, Vlad Rozov  wrote:

> Should Apex add JAVA_HOME to DAGContext and allow application to specify
> which JDK to use if there are multiple JDK installations on Hadoop cluster?
> Yarn already supports custom JAVA_HOME (please see
> https://issues.apache.org/jira/browse/YARN-2481).
>
> Vlad
>


[jira] [Commented] (APEXCORE-495) Enhancing the configuration package to store apps

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404942#comment-15404942
 ] 

ASF GitHub Bot commented on APEXCORE-495:
-

Github user sandeshh commented on the issue:

https://github.com/apache/apex-core/pull/360
  
@davidyan74 Please review.


> Enhancing the configuration package to store apps
> -
>
> Key: APEXCORE-495
> URL: https://issues.apache.org/jira/browse/APEXCORE-495
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>Assignee: Sandesh
>
> Apex supports configuration package, separates application package from the 
> actual configuration. (http://docs.datatorrent.com/configuration_packages/)
> We want to enhance the configuration package by adding support to "add Apps" 
> (json format). 
> UseCase: Multiple users sharing the same app package, but have a different 
> view of the golden copy of the app package.
> Note: This feature is requested by an Apex user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core issue #360: APEXCORE-495 supporting apps in config package.

2016-08-02 Thread sandeshh
Github user sandeshh commented on the issue:

https://github.com/apache/apex-core/pull/360
  
@davidyan74 Please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


custom JAVA_HOME

2016-08-02 Thread Vlad Rozov
Should Apex add JAVA_HOME to DAGContext and allow application to specify 
which JDK to use if there are multiple JDK installations on Hadoop 
cluster? Yarn already supports custom JAVA_HOME (please see 
https://issues.apache.org/jira/browse/YARN-2481).


Vlad


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404037#comment-15404037
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73161100
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73161100
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} is 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404032#comment-15404032
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160693
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404030#comment-15404030
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160639
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404029#comment-15404029
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160600
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160600
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} is 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160639
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} is 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73160450
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} is 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403976#comment-15403976
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73156746
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
 ---
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the 
key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+  public static final int NUM_DEDUP_PARTITIONS = 5;
+  private static boolean testFailed = false;
+
+  /**
+   * Application to test the partitioning
+   *
+   */
+  public static class TestDedupApp implements StreamingApplication
+  {
+@Override
+public void populateDAG(DAG dag, Configuration conf)
+{
+  TestGenerator gen = dag.addOperator("Generator", new 
TestGenerator());
+
+  TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
+  dedup.setKeyExpression("id");
+  dedup.setTimeExpression("eventTime.getTime()");
+  dedup.setBucketSpan(60);
+  dedup.setExpireBefore(600);
+  
+  ConsoleOutputOperator console = dag.addOperator("Console", new 
ConsoleOutputOperator());
+  dag.addStream("Generator to Dedup", gen.output, dedup.input);
+  dag.addStream("Dedup to Console", dedup.unique, console.input);
+  dag.setInputPortAttribute(dedup.input, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setOutputPortAttribute(dedup.unique, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
+  new 
StatelessPartitioner(NUM_DEDUP_PARTITIONS));
+}
+  }
+
+  public static class TestDeduper extends TimeBasedDedupOperator
+  {
+int operatorId;
+boolean started = false;
+HashMap partitionMap = Maps.newHashMap();
+
+@Override
+public void setup(OperatorContext context)
+{
+  super.setup(context);
+  operatorId = context.getId();
+}
+
+@Override
+protected void processTuple(Object tuple)
+{
+  TestEvent event = (TestEvent)tuple;
+  if (partitionMap.containsKey(event.id)) {
+if (partitionMap.get(event.id) != operatorId) {
+  testFailed = true;
+  throw new RuntimeException("Wrong tuple assignment");
+}
+  } else {
+partitionMap.put(event.id, operatorId);
+  }
+}
+  }
+
+  public static class TestGenerator extends BaseOperator implements 
InputOperator
+  {
+
+public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+private final transient Random r = new Random();
+
+@Override
+public void 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73156746
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
 ---
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the 
key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+  public static final int NUM_DEDUP_PARTITIONS = 5;
+  private static boolean testFailed = false;
+
+  /**
+   * Application to test the partitioning
+   *
+   */
+  public static class TestDedupApp implements StreamingApplication
+  {
+@Override
+public void populateDAG(DAG dag, Configuration conf)
+{
+  TestGenerator gen = dag.addOperator("Generator", new 
TestGenerator());
+
+  TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
+  dedup.setKeyExpression("id");
+  dedup.setTimeExpression("eventTime.getTime()");
+  dedup.setBucketSpan(60);
+  dedup.setExpireBefore(600);
+  
+  ConsoleOutputOperator console = dag.addOperator("Console", new 
ConsoleOutputOperator());
+  dag.addStream("Generator to Dedup", gen.output, dedup.input);
+  dag.addStream("Dedup to Console", dedup.unique, console.input);
+  dag.setInputPortAttribute(dedup.input, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setOutputPortAttribute(dedup.unique, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
+  new 
StatelessPartitioner(NUM_DEDUP_PARTITIONS));
+}
+  }
+
+  public static class TestDeduper extends TimeBasedDedupOperator
+  {
+int operatorId;
+boolean started = false;
+HashMap partitionMap = Maps.newHashMap();
+
+@Override
+public void setup(OperatorContext context)
+{
+  super.setup(context);
+  operatorId = context.getId();
+}
+
+@Override
+protected void processTuple(Object tuple)
+{
+  TestEvent event = (TestEvent)tuple;
+  if (partitionMap.containsKey(event.id)) {
+if (partitionMap.get(event.id) != operatorId) {
+  testFailed = true;
+  throw new RuntimeException("Wrong tuple assignment");
+}
+  } else {
+partitionMap.put(event.id, operatorId);
+  }
+}
+  }
+
+  public static class TestGenerator extends BaseOperator implements 
InputOperator
+  {
+
+public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+private final transient Random r = new Random();
+
+@Override
+public void emitTuples()
+{
+  TestEvent event = new TestEvent();
+  event.id = r.nextInt(100);
+  output.emit(event);
+}
+  }
+
+  public static class TestEvent
+  {
+private int id;
+private Date 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403963#comment-15403963
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73155029
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java
 ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperIdempotencyTest
+{
+  public static boolean testFailed = false;
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+try {
+  LocalMode lma = LocalMode.newInstance();
+  Configuration conf = new Configuration(false);
+  lma.prepareDAG(new DeduperIdempotencyTestApp(), conf);
+  LocalMode.Controller lc = lma.getController();
+  lc.runAsync();
--- End diff --

done


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73155029
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java
 ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperIdempotencyTest
+{
+  public static boolean testFailed = false;
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+try {
+  LocalMode lma = LocalMode.newInstance();
+  Configuration conf = new Configuration(false);
+  lma.prepareDAG(new DeduperIdempotencyTestApp(), conf);
+  LocalMode.Controller lc = lma.getController();
+  lc.runAsync();
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403891#comment-15403891
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73145124
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73145124
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+if (!store.put(key, 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403872#comment-15403872
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142681
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142681
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403871#comment-15403871
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142614
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142614
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142554
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403869#comment-15403869
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142554
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403863#comment-15403863
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142273
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403864#comment-15403864
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142337
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142228
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403857#comment-15403857
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141988
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java 
---
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
--- End diff --

done


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73142080
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link 
#preserveTupleOrder} is 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141988
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java 
---
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403849#comment-15403849
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141529
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  

[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403848#comment-15403848
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141506
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
--- End diff --

ms. Will add it into java docs.


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403845#comment-15403845
 ] 

ASF GitHub Bot commented on APEXMALHAR-2100:


Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141485
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
--- End diff --

Yes. If there are multiple key fields from each stream then the primary key 
field has to specify in keyFields and the other key fields has to take care in 
mergeTuples().


> Development of Inner Join Operator using Spillable Datastructures
> -
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
>  Issue Type: Task
>Reporter: Chaitanya
>Assignee: Chaitanya
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141502
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
--- End diff --

Yes. It can be. If the timeField of the stream is not in milliseconds then 
the user has to override the extractTime() method and convert the field into 
milliseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141529
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
+ * isStream1KeyPrimary: : Specifies whether the stream1 key is 
primary or not
+ * isStream2KeyPrimary: : Specifies whether the stream2 key is 
primary or not
+ *
+ *  Example:  
+ *  Left input port receives customer details and right input port 
receives Order details.
+ *  Schema for the Customer be in the form of {ID, Name, CTime}
+ *  Schema for the Order be in the form of {OID, CID, OTime}
+ *  Now, Join the tuples of Customer and Order streams where Customer.ID = 
Order.CID and the constraint is
+ *  matched tuples must have timestamp within 5 minutes.
+ *  Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime 
= 5 minutes  
+ *
+ *  @displayName Abstract Inner Join Operator
+ *  @tags join
+ */
+public abstract class AbstractInnerJoinOperator extends BaseOperator
+{
+  protected transient String[][] includeFields;
+  protected transient List keyFields;
+  protected transient List timeFields;
+  @AutoMetric
+  private long tuplesJoinedPerSec;
+  private double windowTimeSec;
+  private int tuplesCount;
+  @NotNull
+  private String keyFieldsStr;
+  @NotNull
+  private String includeFieldStr;
+  private String timeFieldsStr;
+  private Long stream1ExpiryTime;
+  private Long stream2ExpiryTime;
+  private boolean isStream1KeyPrimary = true;
+  private boolean isStream2KeyPrimary = true;
+  protected SpillableComplexComponent component;
+  protected Spillable.SpillableByteArrayListMultimap stream1Data;
+  protected Spillable.SpillableByteArrayListMultimap stream2Data;
+
+  /**
+   * Process the tuple which are received from input ports with the 
following steps:
+   * 1) Extract key from the given tuple
+   * 2) Insert  into the store where store is the stream1Data 
if the tuple
+   * receives from stream1 or viceversa.
+   * 3) Get the values of the key if found it in opposite store
+   * 4) Merge the given tuple and values found from step (3)
+   * @param tuple given tuple
+   * @param isStream1Data Specifies whether the given tuple belongs to 
stream1 or not.
+   */
+  protected void processTuple(T tuple, boolean isStream1Data)
+  {
+Spillable.SpillableByteArrayListMultimap store = isStream1Data ? 
stream1Data : stream2Data;
+K key = extractKey(tuple,isStream1Data);
+if (!store.put(key, 

[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141506
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
+ * timeFields: List of comma separated time field for both the 
streams. Ex: Field1,Field2
+ * expiryTime: Expiry time for stored tuples
--- End diff --

ms. Will add it into java docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...

2016-08-02 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/330#discussion_r73141485
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java 
---
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.Spillable;
+import 
org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * 
+ * An abstract implementation of inner join operator. Operator receives 
tuples from two streams,
+ * applies the join operation based on constraint and emit the joined 
value.
+ * Concrete classes should provide implementation to extractKey, 
extractTime, mergeTuples methods.
+ *
+ * Properties:
+ * includeFieldStr: List of comma separated fields to be added to 
the output tuple.
+ * Ex: Field1,Field2;Field3,Field4
+ * keyFields: List of comma separated key field for both the 
streams. Ex: Field1,Field2
--- End diff --

Yes. If there are multiple key fields from each stream then the primary key 
field has to specify in keyFields and the other key fields has to take care in 
mergeTuples().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403840#comment-15403840
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141245
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java
 ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperIdempotencyTest
--- End diff --

Knit: Can you rename this to DeduperOrderTest ? I don't this this test 
idempotency really.. :)


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141245
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java
 ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperIdempotencyTest
--- End diff --

Knit: Can you rename this to DeduperOrderTest ? I don't this this test 
idempotency really.. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403837#comment-15403837
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141100
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
 ---
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the 
key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+  public static final int NUM_DEDUP_PARTITIONS = 5;
+  private static boolean testFailed = false;
+
+  /**
+   * Application to test the partitioning
+   *
+   */
+  public static class TestDedupApp implements StreamingApplication
+  {
+@Override
+public void populateDAG(DAG dag, Configuration conf)
+{
+  TestGenerator gen = dag.addOperator("Generator", new 
TestGenerator());
+
+  TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
+  dedup.setKeyExpression("id");
+  dedup.setTimeExpression("eventTime.getTime()");
+  dedup.setBucketSpan(60);
+  dedup.setExpireBefore(600);
+  
+  ConsoleOutputOperator console = dag.addOperator("Console", new 
ConsoleOutputOperator());
+  dag.addStream("Generator to Dedup", gen.output, dedup.input);
+  dag.addStream("Dedup to Console", dedup.unique, console.input);
+  dag.setInputPortAttribute(dedup.input, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setOutputPortAttribute(dedup.unique, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
+  new 
StatelessPartitioner(NUM_DEDUP_PARTITIONS));
+}
+  }
+
+  public static class TestDeduper extends TimeBasedDedupOperator
+  {
+int operatorId;
+boolean started = false;
+HashMap partitionMap = Maps.newHashMap();
+
+@Override
+public void setup(OperatorContext context)
+{
+  super.setup(context);
+  operatorId = context.getId();
+}
+
+@Override
+protected void processTuple(Object tuple)
+{
+  TestEvent event = (TestEvent)tuple;
+  if (partitionMap.containsKey(event.id)) {
+if (partitionMap.get(event.id) != operatorId) {
+  testFailed = true;
+  throw new RuntimeException("Wrong tuple assignment");
+}
+  } else {
+partitionMap.put(event.id, operatorId);
+  }
+}
+  }
+
+  public static class TestGenerator extends BaseOperator implements 
InputOperator
+  {
+
+public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+private final transient Random r = new Random();
+
+@Override
+public void 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141070
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java
 ---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperIdempotencyTest
+{
+  public static boolean testFailed = false;
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+try {
+  LocalMode lma = LocalMode.newInstance();
+  Configuration conf = new Configuration(false);
+  lma.prepareDAG(new DeduperIdempotencyTestApp(), conf);
+  LocalMode.Controller lc = lma.getController();
+  lc.runAsync();
--- End diff --

Can you run the test in async mode of Controller using method similar to:
com.datatorrent.lib.io.fs.FileSplitterBaseTest.testSplitterInApp

This gives a deterministic way of verification the cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73141100
  
--- Diff: 
library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
 ---
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the 
key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+  public static final int NUM_DEDUP_PARTITIONS = 5;
+  private static boolean testFailed = false;
+
+  /**
+   * Application to test the partitioning
+   *
+   */
+  public static class TestDedupApp implements StreamingApplication
+  {
+@Override
+public void populateDAG(DAG dag, Configuration conf)
+{
+  TestGenerator gen = dag.addOperator("Generator", new 
TestGenerator());
+
+  TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper());
+  dedup.setKeyExpression("id");
+  dedup.setTimeExpression("eventTime.getTime()");
+  dedup.setBucketSpan(60);
+  dedup.setExpireBefore(600);
+  
+  ConsoleOutputOperator console = dag.addOperator("Console", new 
ConsoleOutputOperator());
+  dag.addStream("Generator to Dedup", gen.output, dedup.input);
+  dag.addStream("Dedup to Console", dedup.unique, console.input);
+  dag.setInputPortAttribute(dedup.input, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setOutputPortAttribute(dedup.unique, 
Context.PortContext.TUPLE_CLASS, TestEvent.class);
+  dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
+  new 
StatelessPartitioner(NUM_DEDUP_PARTITIONS));
+}
+  }
+
+  public static class TestDeduper extends TimeBasedDedupOperator
+  {
+int operatorId;
+boolean started = false;
+HashMap partitionMap = Maps.newHashMap();
+
+@Override
+public void setup(OperatorContext context)
+{
+  super.setup(context);
+  operatorId = context.getId();
+}
+
+@Override
+protected void processTuple(Object tuple)
+{
+  TestEvent event = (TestEvent)tuple;
+  if (partitionMap.containsKey(event.id)) {
+if (partitionMap.get(event.id) != operatorId) {
+  testFailed = true;
+  throw new RuntimeException("Wrong tuple assignment");
+}
+  } else {
+partitionMap.put(event.id, operatorId);
+  }
+}
+  }
+
+  public static class TestGenerator extends BaseOperator implements 
InputOperator
+  {
+
+public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+private final transient Random r = new Random();
+
+@Override
+public void emitTuples()
+{
+  TestEvent event = new TestEvent();
+  event.id = r.nextInt(100);
+  output.emit(event);
+}
+  }
+
+  public static class TestEvent
+  {
+private int id;
+private 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403809#comment-15403809
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73139258
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, 
the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether 
the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a 
duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes 
the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403808#comment-15403808
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73139110
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73139110
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403802#comment-15403802
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73139012
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73139012
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403768#comment-15403768
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73134653
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java 
---
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
--- End diff --

Please add a javadoc comment describing how this works?


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403767#comment-15403767
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73134312
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java 
---
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+public class DeduperStreamCodec extends KryoSerializableStreamCodec
--- End diff --

Can you make this Evolving?


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132546
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403751#comment-15403751
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132603
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403750#comment-15403750
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132546
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132603
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403745#comment-15403745
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132107
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
--- End diff --

done


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73132123
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403623#comment-15403623
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73115138
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73115146
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java
 ---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+@Evolving
+public class DeduperTimeBasedPOJOImpl extends AbstractDeduper 
implements ActivationListener
+{
+
+  // Required properties
+  @NotNull
+  private String keyExpression;
+
+  private String timeExpression;
+
+  @NotNull
+  private long bucketSpan;
+
+  @NotNull
+  private long expireBefore;
+
+  // Optional
+  private long referenceInstant = -1;
--- End diff --

It is a property for user to supply in seconds. We internally convert it to 
an instant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403624#comment-15403624
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73115146
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java
 ---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+@Evolving
+public class DeduperTimeBasedPOJOImpl extends AbstractDeduper 
implements ActivationListener
+{
+
+  // Required properties
+  @NotNull
+  private String keyExpression;
+
+  private String timeExpression;
+
+  @NotNull
+  private long bucketSpan;
+
+  @NotNull
+  private long expireBefore;
+
+  // Optional
+  private long referenceInstant = -1;
--- End diff --

It is a property for user to supply in seconds. We internally convert it to 
an instant.


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403615#comment-15403615
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73114470
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java
 ---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+@Evolving
+public class DeduperTimeBasedPOJOImpl extends AbstractDeduper 
implements ActivationListener
--- End diff --

Done


> Deduper : create a deduper backed by Managed State
> --
>
> Key: APEXMALHAR-1701
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701
> Project: Apache Apex Malhar
>  Issue Type: Task
>  Components: algorithms
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>
> Need a de-deduplicator operator that is based on Managed State.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73114470
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java
 ---
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+@Evolving
+public class DeduperTimeBasedPOJOImpl extends AbstractDeduper 
implements ActivationListener
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403597#comment-15403597
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73112564
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73112564
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403513#comment-15403513
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73104101
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403502#comment-15403502
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73103038
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
 

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403500#comment-15403500
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102939
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
 

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102939
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403499#comment-15403499
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102835
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102835
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403492#comment-15403492
 ] 

ASF GitHub Bot commented on APEXMALHAR-1701:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102463
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;

[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...

2016-08-02 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/335#discussion_r73102463
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java ---
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import 
org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstract class which allows de-duplicating incoming tuples based on a 
configured key.
+ * Also supports expiry mechanism based on a configurable expiry period 
configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ *
+ * @param  type of events
+ */
+@Evolving
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper
+implements Operator, Operator.IdleTimeHandler, 
ActivationListener, Operator.CheckpointNotificationListener
+{
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public final void process(T tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort unique = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort duplicate = new 
DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort expired = new 
DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained
+   */
+  private boolean orderedOutput = false;
+
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new 
ManagedTimeUnifiedStateImpl();
+
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, 
expired or error) until previous
+   * tuples get processed. This is used only when {@link #orderedOutput} 
is true.
+   */
+  private transient Map decisions;
+  private transient long sleepMillis;
+  private transient Map waitingEvents = 
Maps.newLinkedHashMap();
+  private transient Map asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  

[jira] [Resolved] (APEXMALHAR-2153) Add user documentation for Enricher on apex docs

2016-08-02 Thread Bhupesh Chawda (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhupesh Chawda resolved APEXMALHAR-2153.

   Resolution: Done
Fix Version/s: 3.5.0

Merged

> Add user documentation for Enricher on apex docs
> 
>
> Key: APEXMALHAR-2153
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2153
> Project: Apache Apex Malhar
>  Issue Type: Documentation
>Reporter: Chinmay Kolhatkar
>Assignee: Chinmay Kolhatkar
> Fix For: 3.5.0
>
>
> Add user documentation for Enricher on apex docs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2153) Add user documentation for Enricher on apex docs

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403458#comment-15403458
 ] 

ASF GitHub Bot commented on APEXMALHAR-2153:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/353


> Add user documentation for Enricher on apex docs
> 
>
> Key: APEXMALHAR-2153
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2153
> Project: Apache Apex Malhar
>  Issue Type: Documentation
>Reporter: Chinmay Kolhatkar
>Assignee: Chinmay Kolhatkar
>
> Add user documentation for Enricher on apex docs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #353: APEXMALHAR-2153 Adding user docs for POJOEnri...

2016-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---