[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext
[ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405344#comment-15405344 ] ASF GitHub Bot commented on APEXCORE-448: - Github user chandnisingh commented on the issue: https://github.com/apache/apex-core/pull/364 Changed the method to ```getName()```. > Make operator name available in OperatorContext > --- > > Key: APEXCORE-448 > URL: https://issues.apache.org/jira/browse/APEXCORE-448 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need name of the logical operator in the OperatorContext which can be used by > WindowDataManager to create a unique path per logical operator . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core issue #364: APEXCORE-448 Made operator name available in operator ...
Github user chandnisingh commented on the issue: https://github.com/apache/apex-core/pull/364 Changed the method to ```getName()```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282037 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405335#comment-15405335 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282003 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +public class ManagedTimeStateMultiValueimplements Spillable.SpillableByteArrayListMultimap +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isPrimaryKey) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isPrimaryKey; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (isKeyContainsMultiValue) { + value = (List)streamCodec.fromByteArray(valueSlice); +} else { + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { +return null; + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); +} +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405336#comment-15405336 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282037 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73282003 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java --- @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * + * Properties: + * isKeyContainsMultiValue: Specifies whether the key has multiple value or not. + * timeBucket: Specifies the lenght of the time bucket. + * + */ +public class ManagedTimeStateMultiValueimplements Spillable.SpillableByteArrayListMultimap +{ + private transient StreamCodec streamCodec = null; + private boolean isKeyContainsMultiValue = false; + private long timeBucket; + @NotNull + private ManagedTimeStateImpl store; + + public ManagedTimeStateMultiValue() + { +if (streamCodec == null) { + streamCodec = new KryoSerializableStreamCodec(); +} + } + + public ManagedTimeStateMultiValue(@NotNull ManagedTimeStateImpl store, boolean isPrimaryKey) + { +this(); +this.store = Preconditions.checkNotNull(store); +this.isKeyContainsMultiValue = isPrimaryKey; + } + + /** + * Return the list of values from the store + * @param k given key + * @return list of values + */ + @Override + public List get(@Nullable K k) + { +List value = null; +Slice valueSlice = store.getSync(getBucketId(k), streamCodec.toByteArray(k)); +if (isKeyContainsMultiValue) { + value = (List)streamCodec.fromByteArray(valueSlice); +} else { + if (valueSlice == null || valueSlice.length == 0 || valueSlice.buffer == null) { +return null; + } + value = new ArrayList<>(); + value.add((V)streamCodec.fromByteArray(valueSlice)); +} +return value; + } + + /** + * Returns the Future form the store. + * @param k given key + * @return + */ + public CompositeFuture getAsync(@Nullable K k) + { +return new CompositeFuture(store.getAsync(getBucketId(k), streamCodec.toByteArray(k))); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + + } + + @Override + public int size() + { +throw new UnsupportedOperationException(); + } + + @Override
[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext
[ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405334#comment-15405334 ] ASF GitHub Bot commented on APEXCORE-448: - Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r73281978 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java --- @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout) * @param attributes the value of attributes * @param parentContext */ - public OperatorContext(int id, AttributeMap attributes, Context parentContext) + public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext) { super(attributes, parentContext); this.lastProcessedWindowId = Stateless.WINDOW_ID; this.id = id; this.stateless = super.getValue(OperatorContext.STATELESS); +this.operatorName = Preconditions.checkNotNull(operatorName, "operator name"); --- End diff -- I checked locally in an Application test that empty operator name is allowed in the DAG. I don't think we can restrict it here if adding empty operator name is allowed in the DAG. > Make operator name available in OperatorContext > --- > > Key: APEXCORE-448 > URL: https://issues.apache.org/jira/browse/APEXCORE-448 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need name of the logical operator in the OperatorContext which can be used by > WindowDataManager to create a unique path per logical operator . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r73281978 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java --- @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout) * @param attributes the value of attributes * @param parentContext */ - public OperatorContext(int id, AttributeMap attributes, Context parentContext) + public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext) { super(attributes, parentContext); this.lastProcessedWindowId = Stateless.WINDOW_ID; this.id = id; this.stateless = super.getValue(OperatorContext.STATELESS); +this.operatorName = Preconditions.checkNotNull(operatorName, "operator name"); --- End diff -- I checked locally in an Application test that empty operator name is allowed in the DAG. I don't think we can restrict it here if adding empty operator name is allowed in the DAG. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405331#comment-15405331 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281797 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map , Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { --- End diff -- Ok. Will make expiryTime as mandatory. I thought of different use case and this is invalid in streaming scenario. I will make single expiry time for both the streams. > Development of Inner Join Operator using Spillable Datastructures > - > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chaitanya >Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281797 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractManagedStateInnerJoinOperator.java --- @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue; +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; + +/** + * An abstract implementation of inner join operator over Managed state which extends from + * AbstractInnerJoinOperator. + * + * Properties: + * noOfBuckets: Number of buckets required for Managed state. + * bucketSpanTime: Indicates the length of the time bucket. + */ +public abstract class AbstractManagedStateInnerJoinOperatorextends AbstractInnerJoinOperator implements +Operator.CheckpointNotificationListener, Operator.CheckpointListener,Operator.IdleTimeHandler +{ + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateInnerJoinOperator.class); + public static final String stateDir = "managedState"; + public static final String stream1State = "stream1Data"; + public static final String stream2State = "stream2Data"; + private transient long sleepMillis; + private transient Map , Future> waitingEvents = Maps.newLinkedHashMap(); + private int noOfBuckets = 1; + private Long bucketSpanTime; + protected ManagedTimeStateImpl stream1Store; + protected ManagedTimeStateImpl stream2Store; + + /** + * Create Managed states and stores for both the streams. + */ + @Override + public void createStores() + { +stream1Store = new ManagedTimeStateImpl(); +stream2Store = new ManagedTimeStateImpl(); +stream1Store.setNumBuckets(noOfBuckets); +stream2Store.setNumBuckets(noOfBuckets); +if (bucketSpanTime != null) { --- End diff -- Ok. Will make expiryTime as mandatory. I thought of different use case and this is invalid in streaming scenario. I will make single expiry time for both the streams. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405330#comment-15405330 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281761 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73281761 --- Diff: library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java --- @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.lang.reflect.Array; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.ClassUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; + +/** + * Concrete implementation of AbstractManagedStateInnerJoinOperator and receives objects from both streams. + * + * @displayName POJO Inner Join Operator + * @tags join + */ +public class POJOInnerJoinOperator extends AbstractManagedStateInnerJoinOperator
[GitHub] apex-core pull request #364: APEXCORE-448 Made operator name available in op...
Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r73281443 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java --- @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout) * @param attributes the value of attributes * @param parentContext */ - public OperatorContext(int id, AttributeMap attributes, Context parentContext) + public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext) --- End diff -- A lot of tests create OperatorContext instance without OperatorDeployInfo. OperatorDeployInfo does not expose methods to set the fields - id, name, attributes. So if we change this it will cause lot more changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-448) Make operator name available in OperatorContext
[ https://issues.apache.org/jira/browse/APEXCORE-448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405327#comment-15405327 ] ASF GitHub Bot commented on APEXCORE-448: - Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/apex-core/pull/364#discussion_r73281443 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/OperatorContext.java --- @@ -84,12 +87,13 @@ public void setIdleTimeout(long idleTimeout) * @param attributes the value of attributes * @param parentContext */ - public OperatorContext(int id, AttributeMap attributes, Context parentContext) + public OperatorContext(int id, String operatorName, AttributeMap attributes, Context parentContext) --- End diff -- A lot of tests create OperatorContext instance without OperatorDeployInfo. OperatorDeployInfo does not expose methods to set the fields - id, name, attributes. So if we change this it will cause lot more changes. > Make operator name available in OperatorContext > --- > > Key: APEXCORE-448 > URL: https://issues.apache.org/jira/browse/APEXCORE-448 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need name of the logical operator in the OperatorContext which can be used by > WindowDataManager to create a unique path per logical operator . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXCORE-500) Allow specification of non-heap memory size
Pramod Immaneni created APEXCORE-500: Summary: Allow specification of non-heap memory size Key: APEXCORE-500 URL: https://issues.apache.org/jira/browse/APEXCORE-500 Project: Apache Apex Core Issue Type: Improvement Reporter: Pramod Immaneni Assignee: Pramod Immaneni Currently there are two mechanisms affecting the non-heap size none of which are direct. By default, it is 0.25 of the container size but max size limited to 1GB. The other way is when the heap memory is explicitly specified to a fixed value the remaining memory is available for non-heap. If a specific size is desired for non-heap that is greater than the default then they today the heap size has to be specified for each container. This size could change from container to container based on the operator it contains and the user has to calculate and set these heap sizes. Instead, it will be easier to have an option to directly specify one non-heap size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXMALHAR-2173) add a constraint validation for subject in JMSBase
Sanjay M Pujare created APEXMALHAR-2173: --- Summary: add a constraint validation for subject in JMSBase Key: APEXMALHAR-2173 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2173 Project: Apache Apex Malhar Issue Type: Improvement Reporter: Sanjay M Pujare Priority: Minor In com/datatorrent/lib/io/jms/JMSBase add NotNull constraint check for the member "subject" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: custom JAVA_HOME
How about allowing specification of all environment variables supported by YARN that are non-final described below http://atetric.com/atetric/javadoc/org.apache.hadoop/hadoop-yarn-api/0.23.3/org/apache/hadoop/yarn/api/ApplicationConstants.Environment.html Thanks On Tue, Aug 2, 2016 at 3:43 PM, Vlad Rozovwrote: > Should Apex add JAVA_HOME to DAGContext and allow application to specify > which JDK to use if there are multiple JDK installations on Hadoop cluster? > Yarn already supports custom JAVA_HOME (please see > https://issues.apache.org/jira/browse/YARN-2481). > > Vlad >
[jira] [Commented] (APEXCORE-495) Enhancing the configuration package to store apps
[ https://issues.apache.org/jira/browse/APEXCORE-495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404942#comment-15404942 ] ASF GitHub Bot commented on APEXCORE-495: - Github user sandeshh commented on the issue: https://github.com/apache/apex-core/pull/360 @davidyan74 Please review. > Enhancing the configuration package to store apps > - > > Key: APEXCORE-495 > URL: https://issues.apache.org/jira/browse/APEXCORE-495 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Sandesh >Assignee: Sandesh > > Apex supports configuration package, separates application package from the > actual configuration. (http://docs.datatorrent.com/configuration_packages/) > We want to enhance the configuration package by adding support to "add Apps" > (json format). > UseCase: Multiple users sharing the same app package, but have a different > view of the golden copy of the app package. > Note: This feature is requested by an Apex user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core issue #360: APEXCORE-495 supporting apps in config package.
Github user sandeshh commented on the issue: https://github.com/apache/apex-core/pull/360 @davidyan74 Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
custom JAVA_HOME
Should Apex add JAVA_HOME to DAGContext and allow application to specify which JDK to use if there are multiple JDK installations on Hadoop cluster? Yarn already supports custom JAVA_HOME (please see https://issues.apache.org/jira/browse/YARN-2481). Vlad
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404037#comment-15404037 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73161100 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73161100 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404032#comment-15404032 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160693 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404030#comment-15404030 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160639 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404029#comment-15404029 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160600 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160600 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160639 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73160450 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403976#comment-15403976 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73156746 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java --- @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Tests whether the operator functions correctly when partitioned + * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple. + * + */ +public class DeduperPartitioningTest +{ + public static final int NUM_DEDUP_PARTITIONS = 5; + private static boolean testFailed = false; + + /** + * Application to test the partitioning + * + */ + public static class TestDedupApp implements StreamingApplication + { +@Override +public void populateDAG(DAG dag, Configuration conf) +{ + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + + TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper()); + dedup.setKeyExpression("id"); + dedup.setTimeExpression("eventTime.getTime()"); + dedup.setBucketSpan(60); + dedup.setExpireBefore(600); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + dag.addStream("Generator to Dedup", gen.output, dedup.input); + dag.addStream("Dedup to Console", dedup.unique, console.input); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner(NUM_DEDUP_PARTITIONS)); +} + } + + public static class TestDeduper extends TimeBasedDedupOperator + { +int operatorId; +boolean started = false; +HashMappartitionMap = Maps.newHashMap(); + +@Override +public void setup(OperatorContext context) +{ + super.setup(context); + operatorId = context.getId(); +} + +@Override +protected void processTuple(Object tuple) +{ + TestEvent event = (TestEvent)tuple; + if (partitionMap.containsKey(event.id)) { +if (partitionMap.get(event.id) != operatorId) { + testFailed = true; + throw new RuntimeException("Wrong tuple assignment"); +} + } else { +partitionMap.put(event.id, operatorId); + } +} + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + +public final transient DefaultOutputPort output = new DefaultOutputPort<>(); +private final transient Random r = new Random(); + +@Override +public void
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73156746 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java --- @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Tests whether the operator functions correctly when partitioned + * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple. + * + */ +public class DeduperPartitioningTest +{ + public static final int NUM_DEDUP_PARTITIONS = 5; + private static boolean testFailed = false; + + /** + * Application to test the partitioning + * + */ + public static class TestDedupApp implements StreamingApplication + { +@Override +public void populateDAG(DAG dag, Configuration conf) +{ + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + + TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper()); + dedup.setKeyExpression("id"); + dedup.setTimeExpression("eventTime.getTime()"); + dedup.setBucketSpan(60); + dedup.setExpireBefore(600); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + dag.addStream("Generator to Dedup", gen.output, dedup.input); + dag.addStream("Dedup to Console", dedup.unique, console.input); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner(NUM_DEDUP_PARTITIONS)); +} + } + + public static class TestDeduper extends TimeBasedDedupOperator + { +int operatorId; +boolean started = false; +HashMappartitionMap = Maps.newHashMap(); + +@Override +public void setup(OperatorContext context) +{ + super.setup(context); + operatorId = context.getId(); +} + +@Override +protected void processTuple(Object tuple) +{ + TestEvent event = (TestEvent)tuple; + if (partitionMap.containsKey(event.id)) { +if (partitionMap.get(event.id) != operatorId) { + testFailed = true; + throw new RuntimeException("Wrong tuple assignment"); +} + } else { +partitionMap.put(event.id, operatorId); + } +} + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + +public final transient DefaultOutputPort output = new DefaultOutputPort<>(); +private final transient Random r = new Random(); + +@Override +public void emitTuples() +{ + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + output.emit(event); +} + } + + public static class TestEvent + { +private int id; +private Date
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403963#comment-15403963 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73155029 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java --- @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperIdempotencyTest +{ + public static boolean testFailed = false; + + @Test + public void testApplication() throws IOException, Exception + { +try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new DeduperIdempotencyTestApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); --- End diff -- done > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73155029 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java --- @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperIdempotencyTest +{ + public static boolean testFailed = false; + + @Test + public void testApplication() throws IOException, Exception + { +try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new DeduperIdempotencyTestApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403891#comment-15403891 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73145124 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperatorextends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap stream1Data; + protected Spillable.SpillableByteArrayListMultimap stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ +
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73145124 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperatorextends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap stream1Data; + protected Spillable.SpillableByteArrayListMultimap stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +if (!store.put(key,
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403872#comment-15403872 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142681 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142681 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder}
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403871#comment-15403871 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142614 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142614 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder}
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142554 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder}
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403869#comment-15403869 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142554 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403863#comment-15403863 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142273 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403864#comment-15403864 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142337 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142228 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder}
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403857#comment-15403857 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141988 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java --- @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + --- End diff -- done > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73142080 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #preserveTupleOrder} is
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141988 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java --- @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403849#comment-15403849 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141529 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperatorextends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap stream1Data; + protected Spillable.SpillableByteArrayListMultimap stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ +
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403848#comment-15403848 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141506 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples --- End diff -- ms. Will add it into java docs. > Development of Inner Join Operator using Spillable Datastructures > - > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chaitanya >Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2100) Development of Inner Join Operator using Spillable Datastructures
[ https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403845#comment-15403845 ] ASF GitHub Bot commented on APEXMALHAR-2100: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141485 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 --- End diff -- Yes. If there are multiple key fields from each stream then the primary key field has to specify in keyFields and the other key fields has to take care in mergeTuples(). > Development of Inner Join Operator using Spillable Datastructures > - > > Key: APEXMALHAR-2100 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100 > Project: Apache Apex Malhar > Issue Type: Task >Reporter: Chaitanya >Assignee: Chaitanya > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141502 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 --- End diff -- Yes. It can be. If the timeField of the stream is not in milliseconds then the user has to override the extractTime() method and convert the field into milliseconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141529 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples + * isStream1KeyPrimary: : Specifies whether the stream1 key is primary or not + * isStream2KeyPrimary: : Specifies whether the stream2 key is primary or not + * + * Example: + * Left input port receives customer details and right input port receives Order details. + * Schema for the Customer be in the form of {ID, Name, CTime} + * Schema for the Order be in the form of {OID, CID, OTime} + * Now, Join the tuples of Customer and Order streams where Customer.ID = Order.CID and the constraint is + * matched tuples must have timestamp within 5 minutes. + * Here, key Fields = ID, CID and Time Fields = CTime, OTime, expiryTime = 5 minutes + * + * @displayName Abstract Inner Join Operator + * @tags join + */ +public abstract class AbstractInnerJoinOperatorextends BaseOperator +{ + protected transient String[][] includeFields; + protected transient List keyFields; + protected transient List timeFields; + @AutoMetric + private long tuplesJoinedPerSec; + private double windowTimeSec; + private int tuplesCount; + @NotNull + private String keyFieldsStr; + @NotNull + private String includeFieldStr; + private String timeFieldsStr; + private Long stream1ExpiryTime; + private Long stream2ExpiryTime; + private boolean isStream1KeyPrimary = true; + private boolean isStream2KeyPrimary = true; + protected SpillableComplexComponent component; + protected Spillable.SpillableByteArrayListMultimap stream1Data; + protected Spillable.SpillableByteArrayListMultimap stream2Data; + + /** + * Process the tuple which are received from input ports with the following steps: + * 1) Extract key from the given tuple + * 2) Insert into the store where store is the stream1Data if the tuple + * receives from stream1 or viceversa. + * 3) Get the values of the key if found it in opposite store + * 4) Merge the given tuple and values found from step (3) + * @param tuple given tuple + * @param isStream1Data Specifies whether the given tuple belongs to stream1 or not. + */ + protected void processTuple(T tuple, boolean isStream1Data) + { +Spillable.SpillableByteArrayListMultimap store = isStream1Data ? stream1Data : stream2Data; +K key = extractKey(tuple,isStream1Data); +if (!store.put(key,
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141506 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 + * timeFields: List of comma separated time field for both the streams. Ex: Field1,Field2 + * expiryTime: Expiry time for stored tuples --- End diff -- ms. Will add it into java docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #330: APEXMALHAR-2100 Implementation of Inner Join ...
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/330#discussion_r73141485 --- Diff: library/src/main/java/com/datatorrent/lib/join/AbstractInnerJoinOperator.java --- @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.join; + +import java.util.Arrays; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableComplexComponent; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * An abstract implementation of inner join operator. Operator receives tuples from two streams, + * applies the join operation based on constraint and emit the joined value. + * Concrete classes should provide implementation to extractKey, extractTime, mergeTuples methods. + * + * Properties: + * includeFieldStr: List of comma separated fields to be added to the output tuple. + * Ex: Field1,Field2;Field3,Field4 + * keyFields: List of comma separated key field for both the streams. Ex: Field1,Field2 --- End diff -- Yes. If there are multiple key fields from each stream then the primary key field has to specify in keyFields and the other key fields has to take care in mergeTuples(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403840#comment-15403840 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141245 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java --- @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperIdempotencyTest --- End diff -- Knit: Can you rename this to DeduperOrderTest ? I don't this this test idempotency really.. :) > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141245 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java --- @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperIdempotencyTest --- End diff -- Knit: Can you rename this to DeduperOrderTest ? I don't this this test idempotency really.. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403837#comment-15403837 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141100 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java --- @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Tests whether the operator functions correctly when partitioned + * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple. + * + */ +public class DeduperPartitioningTest +{ + public static final int NUM_DEDUP_PARTITIONS = 5; + private static boolean testFailed = false; + + /** + * Application to test the partitioning + * + */ + public static class TestDedupApp implements StreamingApplication + { +@Override +public void populateDAG(DAG dag, Configuration conf) +{ + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + + TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper()); + dedup.setKeyExpression("id"); + dedup.setTimeExpression("eventTime.getTime()"); + dedup.setBucketSpan(60); + dedup.setExpireBefore(600); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + dag.addStream("Generator to Dedup", gen.output, dedup.input); + dag.addStream("Dedup to Console", dedup.unique, console.input); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner(NUM_DEDUP_PARTITIONS)); +} + } + + public static class TestDeduper extends TimeBasedDedupOperator + { +int operatorId; +boolean started = false; +HashMappartitionMap = Maps.newHashMap(); + +@Override +public void setup(OperatorContext context) +{ + super.setup(context); + operatorId = context.getId(); +} + +@Override +protected void processTuple(Object tuple) +{ + TestEvent event = (TestEvent)tuple; + if (partitionMap.containsKey(event.id)) { +if (partitionMap.get(event.id) != operatorId) { + testFailed = true; + throw new RuntimeException("Wrong tuple assignment"); +} + } else { +partitionMap.put(event.id, operatorId); + } +} + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + +public final transient DefaultOutputPort output = new DefaultOutputPort<>(); +private final transient Random r = new Random(); + +@Override +public void
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141070 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperIdempotencyTest.java --- @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.io.IOException; +import java.util.Date; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; + +public class DeduperIdempotencyTest +{ + public static boolean testFailed = false; + + @Test + public void testApplication() throws IOException, Exception + { +try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new DeduperIdempotencyTestApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); --- End diff -- Can you run the test in async mode of Controller using method similar to: com.datatorrent.lib.io.fs.FileSplitterBaseTest.testSplitterInApp This gives a deterministic way of verification the cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73141100 --- Diff: library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java --- @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Date; +import java.util.HashMap; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Tests whether the operator functions correctly when partitioned + * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple. + * + */ +public class DeduperPartitioningTest +{ + public static final int NUM_DEDUP_PARTITIONS = 5; + private static boolean testFailed = false; + + /** + * Application to test the partitioning + * + */ + public static class TestDedupApp implements StreamingApplication + { +@Override +public void populateDAG(DAG dag, Configuration conf) +{ + TestGenerator gen = dag.addOperator("Generator", new TestGenerator()); + + TestDeduper dedup = dag.addOperator("Deduper", new TestDeduper()); + dedup.setKeyExpression("id"); + dedup.setTimeExpression("eventTime.getTime()"); + dedup.setBucketSpan(60); + dedup.setExpireBefore(600); + + ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator()); + dag.addStream("Generator to Dedup", gen.output, dedup.input); + dag.addStream("Dedup to Console", dedup.unique, console.input); + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class); + dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner(NUM_DEDUP_PARTITIONS)); +} + } + + public static class TestDeduper extends TimeBasedDedupOperator + { +int operatorId; +boolean started = false; +HashMappartitionMap = Maps.newHashMap(); + +@Override +public void setup(OperatorContext context) +{ + super.setup(context); + operatorId = context.getId(); +} + +@Override +protected void processTuple(Object tuple) +{ + TestEvent event = (TestEvent)tuple; + if (partitionMap.containsKey(event.id)) { +if (partitionMap.get(event.id) != operatorId) { + testFailed = true; + throw new RuntimeException("Wrong tuple assignment"); +} + } else { +partitionMap.put(event.id, operatorId); + } +} + } + + public static class TestGenerator extends BaseOperator implements InputOperator + { + +public final transient DefaultOutputPort output = new DefaultOutputPort<>(); +private final transient Random r = new Random(); + +@Override +public void emitTuples() +{ + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + output.emit(event); +} + } + + public static class TestEvent + { +private int id; +private
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403809#comment-15403809 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73139258 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * Following steps are used in identifying the state of a particular tuple: + * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired + * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the + * time bucket identified by the event time. If, so, the tuple is a duplicate. + * 3. Otherwise the tuple is a unique tuple. + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained. + * Making this "true" might entail some cost in performance, but makes the operator idempotent. + */ + private boolean preserveTupleOrder = true; + + @NotNull + protected final ManagedTimeUnifiedStateImpl
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403808#comment-15403808 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73139110 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73139110 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403802#comment-15403802 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73139012 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73139012 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403768#comment-15403768 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73134653 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java --- @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + --- End diff -- Please add a javadoc comment describing how this works? > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403767#comment-15403767 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73134312 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java --- @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; + +public class DeduperStreamCodec extends KryoSerializableStreamCodec --- End diff -- Can you make this Evolving? > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132546 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403751#comment-15403751 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132603 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403750#comment-15403750 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132546 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132603 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403745#comment-15403745 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132107 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} --- End diff -- done > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73132123 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403623#comment-15403623 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73115138 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73115146 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java --- @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +@Evolving +public class DeduperTimeBasedPOJOImpl extends AbstractDeduper implements ActivationListener +{ + + // Required properties + @NotNull + private String keyExpression; + + private String timeExpression; + + @NotNull + private long bucketSpan; + + @NotNull + private long expireBefore; + + // Optional + private long referenceInstant = -1; --- End diff -- It is a property for user to supply in seconds. We internally convert it to an instant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403624#comment-15403624 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73115146 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java --- @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +@Evolving +public class DeduperTimeBasedPOJOImpl extends AbstractDeduper implements ActivationListener +{ + + // Required properties + @NotNull + private String keyExpression; + + private String timeExpression; + + @NotNull + private long bucketSpan; + + @NotNull + private long expireBefore; + + // Optional + private long referenceInstant = -1; --- End diff -- It is a property for user to supply in seconds. We internally convert it to an instant. > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403615#comment-15403615 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73114470 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java --- @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +@Evolving +public class DeduperTimeBasedPOJOImpl extends AbstractDeduper implements ActivationListener --- End diff -- Done > Deduper : create a deduper backed by Managed State > -- > > Key: APEXMALHAR-1701 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1701 > Project: Apache Apex Malhar > Issue Type: Task > Components: algorithms >Reporter: Chandni Singh >Assignee: Chandni Singh > > Need a de-deduplicator operator that is based on Managed State. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73114470 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImpl.java --- @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.lib.util.PojoUtils; +import com.datatorrent.lib.util.PojoUtils.Getter; +import com.datatorrent.netlet.util.Slice; + +@Evolving +public class DeduperTimeBasedPOJOImpl extends AbstractDeduper implements ActivationListener --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403597#comment-15403597 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73112564 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73112564 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents;
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403513#comment-15403513 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73104101 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403502#comment-15403502 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73103038 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403500#comment-15403500 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102939 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102939 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents;
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403499#comment-15403499 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102835 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102835 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Commented] (APEXMALHAR-1701) Deduper : create a deduper backed by Managed State
[ https://issues.apache.org/jira/browse/APEXMALHAR-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403492#comment-15403492 ] ASF GitHub Bot commented on APEXMALHAR-1701: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102463 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis;
[GitHub] apex-malhar pull request #335: [Review Only] APEXMALHAR-1701 Deduper with Ma...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/335#discussion_r73102463 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java --- @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dedup; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.ActivationListener; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.netlet.util.Slice; + +/** + * Abstract class which allows de-duplicating incoming tuples based on a configured key. + * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner} + * in {@link ManagedTimeUnifiedStateImpl} + * + * @param type of events + */ +@Evolving +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public abstract class AbstractDeduper +implements Operator, Operator.IdleTimeHandler, ActivationListener, Operator.CheckpointNotificationListener +{ + /** + * The input port on which events are received. + */ + public final transient DefaultInputPort input = new DefaultInputPort() + { +@Override +public final void process(T tuple) +{ + processTuple(tuple); +} + }; + + /** + * The output port on which deduped events are emitted. + */ + public final transient DefaultOutputPort unique = new DefaultOutputPort<>(); + + /** + * The output port on which duplicate events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort duplicate = new DefaultOutputPort<>(); + + /** + * The output port on which expired events are emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort expired = new DefaultOutputPort<>(); + + /** + * Whether or not the order of tuples be maintained + */ + private boolean orderedOutput = false; + + @NotNull + protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl(); + + /** + * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous + * tuples get processed. This is used only when {@link #orderedOutput} is true. + */ + private transient Mapdecisions; + private transient long sleepMillis; + private transient Map waitingEvents = Maps.newLinkedHashMap(); + private transient Map asyncEvents = Maps.newLinkedHashMap(); + + // Metrics + @AutoMetric + private transient long uniqueEvents; +
[jira] [Resolved] (APEXMALHAR-2153) Add user documentation for Enricher on apex docs
[ https://issues.apache.org/jira/browse/APEXMALHAR-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bhupesh Chawda resolved APEXMALHAR-2153. Resolution: Done Fix Version/s: 3.5.0 Merged > Add user documentation for Enricher on apex docs > > > Key: APEXMALHAR-2153 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2153 > Project: Apache Apex Malhar > Issue Type: Documentation >Reporter: Chinmay Kolhatkar >Assignee: Chinmay Kolhatkar > Fix For: 3.5.0 > > > Add user documentation for Enricher on apex docs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2153) Add user documentation for Enricher on apex docs
[ https://issues.apache.org/jira/browse/APEXMALHAR-2153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403458#comment-15403458 ] ASF GitHub Bot commented on APEXMALHAR-2153: Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/353 > Add user documentation for Enricher on apex docs > > > Key: APEXMALHAR-2153 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2153 > Project: Apache Apex Malhar > Issue Type: Documentation >Reporter: Chinmay Kolhatkar >Assignee: Chinmay Kolhatkar > > Add user documentation for Enricher on apex docs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #353: APEXMALHAR-2153 Adding user docs for POJOEnri...
Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/353 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---