Re: [DISCUSSION] Custom Control Tuples

2016-06-24 Thread Vlad Rozov

+1 for option 1.

Thank you,

Vlad

On 6/24/16 14:35, Bright Chen wrote:

+1
It also can help to Shutdown the application gracefully.
Bright


On Jun 24, 2016, at 1:35 PM, Siyuan Hua  wrote:

+1

I think it's good to have custom control tuple and I prefer the 1 option.

Also I think we should think about couple different callbacks, that could
be operator level(triggered when an operator receives an control tuple) or
dag level(triggered when control tuple flow over the whole dag)

Regards,
Siyuan




On Fri, Jun 24, 2016 at 12:42 PM, David Yan  wrote:


My initial thinking is that the custom control tuples, just like the
existing control tuples, will only be generated from the input operators
and will be propagated downstream to all operators in the DAG. So the NxM
partitioning scenario works just like how other control tuples work, i.e.
the callback will not be called unless all ports have received the control
tuple for a particular window. This creates a little bit of complication
with multiple input operators though.

David


On Fri, Jun 24, 2016 at 12:03 PM, Tushar Gosavi 
wrote:


+1 for the feature

I am in favor of option 1, but we may need an helper method to avoid
compiler error on typed port, as calling port.emit(controlTuple) will
be an error if type of control tuple and port does not match. or new
method in outputPort object , emitControlTuple(ControlTuple).

Can you give example of piggy backing tuple with current BEGIN_WINDOW
and END_WINDOW control tuples?

In case of NxM partitioning, each downstream operator will receive N
control tuples. will it call user handler N times for each downstream
operator or just once.

Regards,
- Tushar.



On Fri, Jun 24, 2016 at 11:52 PM, David Yan 

wrote:

Hi all,

I would like to propose a new feature to the Apex core engine -- the
support of custom control tuples. Currently, we have control tuples

such

as

BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
support for applications to insert their own control tuples. The way
currently to get around this is to use data tuples and have a separate

port

for such tuples that sends tuples to all partitions of the downstream
operators, which is not exactly developer friendly.

We have already seen a number of use cases that can use this feature:

1) Batch support: We need to tell all operators of the physical DAG

when

a

batch starts and ends, so the operators can do whatever that is needed

upon

the start or the end of a batch.

2) Watermark: To support the concepts of event time windowing, the
watermark control tuple is needed to tell which windows should be
considered late.

3) Changing operator properties: We do have the support of changing
operator properties on the fly, but with a custom control tuple, the
command to change operator properties can be window aligned for all
partitions and also across the DAG.

4) Recording tuples: Like changing operator properties, we do have this
support now but only at the individual physical operator level, and

without

control of which window to record tuples for. With a custom control

tuple,

because a control tuple must belong to a window, all operators in the

DAG

can start (and stop) recording for the same windows.

I can think of two options to achieve this:

1) new custom control tuple type that takes user's serializable object.

2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples.

Please provide your feedback. Thank you.

David




[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348844#comment-15348844
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user brightchen commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68475212
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface describes the individual window.
+ */
+@InterfaceStability.Evolving
+public interface Window
--- End diff --

I think for Window, we could only provide one interface isWithinWIndow(long 
time); So for GlobalWindow, we can simplly return true. And in SessionWindow, 
instead of using static method to merge the window, we can just extendWindow().


> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>| | |
>   ++---+   +-++   +++
>   |KeyedCombine|   |KeyedGroup|   | CoGroup |
>   ++   +--+   +-+
> {code}
> Combine operation includes all operations that combine all tuples in one 
> window into one or small number of tuples, Group operation group all tuples 
> in one window, Join and CoGroup are used to join and group tuples from 
> different inputs.
> {panel}
> {panel:title=Components}
> * Window Component
> It includes configuration, window state that should be checkpointed, etc. It 
> should support NonMergibleWindow(fixed or slide) MergibleWindow(Session)
> * Trigger
> It should support early trigger, late trigger with customizable trigger 
> behaviour 
> * Other related components:
> ** Watermark generator, can be plugged into input source to generate watermark
> ** Tuple schema support:
> It should handle either predefined tuple type or give a declarative API to 
> describe the user defined tuple class
> {panel}
> Most component API should be reused in High-Level API
> This is the umbrella ticket, separate tickets would be created for different 
> components and operators respectively 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-24 Thread brightchen
Github user brightchen commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68475212
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface describes the individual window.
+ */
+@InterfaceStability.Evolving
+public interface Window
--- End diff --

I think for Window, we could only provide one interface isWithinWIndow(long 
time); So for GlobalWindow, we can simplly return true. And in SessionWindow, 
instead of using static method to merge the window, we can just extendWindow().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core issue #351: APEXCORE-405 Common API to launch on local mode or clu...

2016-06-24 Thread PramodSSImmaneni
Github user PramodSSImmaneni commented on the issue:

https://github.com/apache/apex-core/pull/351
  
@siyuanh @tushargosavi please see as you have expressed interest in JIRA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-24 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68474418
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
+  };
+
+
+  // TODO: multiple input ports for join operations
+
+  public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  public final transient DefaultOutputPort controlOutput = 
new DefaultOutputPort<>();
+
+  /**
+   * Process the incoming data 

[jira] [Commented] (APEXCORE-405) Provide an API to launch DAG on the cluster

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348824#comment-15348824
 ] 

ASF GitHub Bot commented on APEXCORE-405:
-

Github user PramodSSImmaneni commented on the issue:

https://github.com/apache/apex-core/pull/351
  
@siyuanh @tushargosavi please see as you have expressed interest in JIRA


> Provide an API to launch DAG on the cluster
> ---
>
> Key: APEXCORE-405
> URL: https://issues.apache.org/jira/browse/APEXCORE-405
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Pramod Immaneni
>Assignee: Pramod Immaneni
>
> Today API exists to launch a DAG in local mode but such an API is not 
> available to launch the app on the cluster, only a CLI tool is available. 
> Provide an API to be able to do this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-475) change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex

2016-06-24 Thread David Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348800#comment-15348800
 ] 

David Yan commented on APEXCORE-475:


In a development environment, yes, that can be an issue. 

> change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex
> ---
>
> Key: APEXCORE-475
> URL: https://issues.apache.org/jira/browse/APEXCORE-475
> Project: Apache Apex Core
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Sandesh
>Priority: Minor
>
> This change is backward incompatible to do in the minor release, as that is 
> also used to identify Apex applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXCORE-475) change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex

2016-06-24 Thread Sandesh (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348793#comment-15348793
 ] 

Sandesh commented on APEXCORE-475:
--

They can be using RTS 3.4, but launching the apps using 3.5 Apex CLI for 
various reasons. I do that all the time.

> change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex
> ---
>
> Key: APEXCORE-475
> URL: https://issues.apache.org/jira/browse/APEXCORE-475
> Project: Apache Apex Core
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Sandesh
>Priority: Minor
>
> This change is backward incompatible to do in the minor release, as that is 
> also used to identify Apex applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348758#comment-15348758
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68469300
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
+  };
+
+
+  // TODO: multiple input ports for join operations
+
+  public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();

[jira] [Commented] (APEXCORE-475) change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex

2016-06-24 Thread David Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348743#comment-15348743
 ] 

David Yan commented on APEXCORE-475:


We don't expect users to have multiple Apex installations with different 
versions on the same cluster.

> change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex
> ---
>
> Key: APEXCORE-475
> URL: https://issues.apache.org/jira/browse/APEXCORE-475
> Project: Apache Apex Core
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Sandesh
>Priority: Minor
>
> This change is backward incompatible to do in the minor release, as that is 
> also used to identify Apex applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (APEXMALHAR-2120) Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator

2016-06-24 Thread bright chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen resolved APEXMALHAR-2120.
-
Resolution: Fixed

> Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator
> -
>
> Key: APEXMALHAR-2120
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: bright chen
>Assignee: bright chen
> Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal 
> tuples, but in fact the tuples could be out of order where support multiple 
> cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] 
> INFO  kafka.KafkaInputOperatorTest testInputOperator - Number of 
> received/expected tuples: 22/22, testName: testtopic16, tuples: 
> [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, 
> END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, 
> c1_18]
> - The operator AbstractKafkaInputOperator implemented as "at least once", but 
> the test case assume "exactly once"
> - RuntimeException: Couldn't replay the offset: see following log.
> - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) 
> or OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due 
> to be caused by NullPointerException. See following log
> 
> problem of AbstractKafkaInputOperator:
> 
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
> with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") 
> throws following exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka 
> inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster 
> log - container-6 msg: Stopped running due to an exception. 
> java.lang.RuntimeException: Couldn't replay the offset
> at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
> Undefined offset with no reset policy for partition: testtopic0-1
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> 
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka 
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer 
> run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka 
> inputtesttopic4,type=INPUT,checkpoint={, 0, 
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
>  messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
>   at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
>   at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
>   at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
>   at 
> com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
>   at 
> 

Re: [DISCUSSION] Custom Control Tuples

2016-06-24 Thread Bright Chen
+1
It also can help to Shutdown the application gracefully.
Bright

> On Jun 24, 2016, at 1:35 PM, Siyuan Hua  wrote:
> 
> +1
> 
> I think it's good to have custom control tuple and I prefer the 1 option.
> 
> Also I think we should think about couple different callbacks, that could
> be operator level(triggered when an operator receives an control tuple) or
> dag level(triggered when control tuple flow over the whole dag)
> 
> Regards,
> Siyuan
> 
> 
> 
> 
> On Fri, Jun 24, 2016 at 12:42 PM, David Yan  wrote:
> 
>> My initial thinking is that the custom control tuples, just like the
>> existing control tuples, will only be generated from the input operators
>> and will be propagated downstream to all operators in the DAG. So the NxM
>> partitioning scenario works just like how other control tuples work, i.e.
>> the callback will not be called unless all ports have received the control
>> tuple for a particular window. This creates a little bit of complication
>> with multiple input operators though.
>> 
>> David
>> 
>> 
>> On Fri, Jun 24, 2016 at 12:03 PM, Tushar Gosavi 
>> wrote:
>> 
>>> +1 for the feature
>>> 
>>> I am in favor of option 1, but we may need an helper method to avoid
>>> compiler error on typed port, as calling port.emit(controlTuple) will
>>> be an error if type of control tuple and port does not match. or new
>>> method in outputPort object , emitControlTuple(ControlTuple).
>>> 
>>> Can you give example of piggy backing tuple with current BEGIN_WINDOW
>>> and END_WINDOW control tuples?
>>> 
>>> In case of NxM partitioning, each downstream operator will receive N
>>> control tuples. will it call user handler N times for each downstream
>>> operator or just once.
>>> 
>>> Regards,
>>> - Tushar.
>>> 
>>> 
>>> 
>>> On Fri, Jun 24, 2016 at 11:52 PM, David Yan 
>> wrote:
 Hi all,
 
 I would like to propose a new feature to the Apex core engine -- the
 support of custom control tuples. Currently, we have control tuples
>> such
>>> as
 BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
 support for applications to insert their own control tuples. The way
 currently to get around this is to use data tuples and have a separate
>>> port
 for such tuples that sends tuples to all partitions of the downstream
 operators, which is not exactly developer friendly.
 
 We have already seen a number of use cases that can use this feature:
 
 1) Batch support: We need to tell all operators of the physical DAG
>> when
>>> a
 batch starts and ends, so the operators can do whatever that is needed
>>> upon
 the start or the end of a batch.
 
 2) Watermark: To support the concepts of event time windowing, the
 watermark control tuple is needed to tell which windows should be
 considered late.
 
 3) Changing operator properties: We do have the support of changing
 operator properties on the fly, but with a custom control tuple, the
 command to change operator properties can be window aligned for all
 partitions and also across the DAG.
 
 4) Recording tuples: Like changing operator properties, we do have this
 support now but only at the individual physical operator level, and
>>> without
 control of which window to record tuples for. With a custom control
>>> tuple,
 because a control tuple must belong to a window, all operators in the
>> DAG
 can start (and stop) recording for the same windows.
 
 I can think of two options to achieve this:
 
 1) new custom control tuple type that takes user's serializable object.
 
 2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples.
 
 Please provide your feedback. Thank you.
 
 David
>>> 
>> 



[jira] [Commented] (APEXMALHAR-2120) Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348699#comment-15348699
 ] 

ASF GitHub Bot commented on APEXMALHAR-2120:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/321


> Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator
> -
>
> Key: APEXMALHAR-2120
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: bright chen
>Assignee: bright chen
> Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal 
> tuples, but in fact the tuples could be out of order where support multiple 
> cluster or partition. Here is one example: 2016-06-22 08:54:12,827 [main] 
> INFO  kafka.KafkaInputOperatorTest testInputOperator - Number of 
> received/expected tuples: 22/22, testName: testtopic16, tuples: 
> [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, 
> END_TUPLE, c1_1, c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, 
> c1_18]
> - The operator AbstractKafkaInputOperator implemented as "at least once", but 
> the test case assume "exactly once"
> - RuntimeException: Couldn't replay the offset: see following log.
> - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) 
> or OneToManyPartitioner.assign(OneToManyPartitioner.java:57), that should due 
> to be caused by NullPointerException. See following log
> 
> problem of AbstractKafkaInputOperator:
> 
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() 
> with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") 
> throws following exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka 
> inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster 
> log - container-6 msg: Stopped running due to an exception. 
> java.lang.RuntimeException: Couldn't replay the offset
> at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at 
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: 
> Undefined offset with no reset policy for partition: testtopic0-1
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> 
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka 
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer 
> run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka 
> inputtesttopic4,type=INPUT,checkpoint={, 0, 
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
>  messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an 
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
>   at 
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
>   at 
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
>   at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)

[GitHub] apex-malhar pull request #321: APEXMALHAR-2120 #resolve #comment solve probl...

2016-06-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/321


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-475) change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex

2016-06-24 Thread David Yan (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348653#comment-15348653
 ] 

David Yan commented on APEXCORE-475:


If we change the application type to ApacheApex for all newly launched apps, 
and include both DataTorrent and ApacheApex when retrieving apps, then it 
should solve the backward incompatibility problem, right?  We can remove the 
"DataTorrent" when retrieving apps when we move to 4.0.

> change the YARN_APPLICATION_TYPE from DataTorrent to ApacheApex
> ---
>
> Key: APEXCORE-475
> URL: https://issues.apache.org/jira/browse/APEXCORE-475
> Project: Apache Apex Core
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Sandesh
>Priority: Minor
>
> This change is backward incompatible to do in the minor release, as that is 
> also used to identify Apex applications.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Custom Control Tuples

2016-06-24 Thread David Yan
My initial thinking is that the custom control tuples, just like the
existing control tuples, will only be generated from the input operators
and will be propagated downstream to all operators in the DAG. So the NxM
partitioning scenario works just like how other control tuples work, i.e.
the callback will not be called unless all ports have received the control
tuple for a particular window. This creates a little bit of complication
with multiple input operators though.

David


On Fri, Jun 24, 2016 at 12:03 PM, Tushar Gosavi 
wrote:

> +1 for the feature
>
> I am in favor of option 1, but we may need an helper method to avoid
> compiler error on typed port, as calling port.emit(controlTuple) will
> be an error if type of control tuple and port does not match. or new
> method in outputPort object , emitControlTuple(ControlTuple).
>
> Can you give example of piggy backing tuple with current BEGIN_WINDOW
> and END_WINDOW control tuples?
>
> In case of NxM partitioning, each downstream operator will receive N
> control tuples. will it call user handler N times for each downstream
> operator or just once.
>
> Regards,
> - Tushar.
>
>
>
> On Fri, Jun 24, 2016 at 11:52 PM, David Yan  wrote:
> > Hi all,
> >
> > I would like to propose a new feature to the Apex core engine -- the
> > support of custom control tuples. Currently, we have control tuples such
> as
> > BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
> > support for applications to insert their own control tuples. The way
> > currently to get around this is to use data tuples and have a separate
> port
> > for such tuples that sends tuples to all partitions of the downstream
> > operators, which is not exactly developer friendly.
> >
> > We have already seen a number of use cases that can use this feature:
> >
> > 1) Batch support: We need to tell all operators of the physical DAG when
> a
> > batch starts and ends, so the operators can do whatever that is needed
> upon
> > the start or the end of a batch.
> >
> > 2) Watermark: To support the concepts of event time windowing, the
> > watermark control tuple is needed to tell which windows should be
> > considered late.
> >
> > 3) Changing operator properties: We do have the support of changing
> > operator properties on the fly, but with a custom control tuple, the
> > command to change operator properties can be window aligned for all
> > partitions and also across the DAG.
> >
> > 4) Recording tuples: Like changing operator properties, we do have this
> > support now but only at the individual physical operator level, and
> without
> > control of which window to record tuples for. With a custom control
> tuple,
> > because a control tuple must belong to a window, all operators in the DAG
> > can start (and stop) recording for the same windows.
> >
> > I can think of two options to achieve this:
> >
> > 1) new custom control tuple type that takes user's serializable object.
> >
> > 2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples.
> >
> > Please provide your feedback. Thank you.
> >
> > David
>


[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348583#comment-15348583
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68450157
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ */
+public abstract class AbstractWindowedOperator
--- End diff --

Good point. done.


> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>| | |
>   ++---+   +-++   +++
>   |KeyedCombine|   |KeyedGroup|   | CoGroup |
>   

[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348582#comment-15348582
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68450198
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
 ---
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is an implementation of WindowedOperator that takes in key value 
pairs as input and gives out key value pairs
+ * as output. If your operation is not key based, please use {@link 
WindowedOperatorImpl}.
+ */
+public class KeyedWindowedOperatorImpl
--- End diff --

done


> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>| | |
>   ++---+   +-++   +++
>   |KeyedCombine|   |KeyedGroup|   | CoGroup |
>   ++   +--+   +-+
> {code}
> Combine operation includes all operations that combine all tuples in one 
> window into one or small number of tuples, Group operation group all tuples 
> in one window, Join and CoGroup are used to join and group tuples from 
> different inputs.
> {panel}
> {panel:title=Components}
> * Window Component
> It includes configuration, window state that should be checkpointed, etc. It 
> should support NonMergibleWindow(fixed or slide) MergibleWindow(Session)
> * Trigger
> It should support early trigger, late trigger with customizable trigger 
> behaviour 
> * Other related components:
> ** 

[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-24 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68450198
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
 ---
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.SessionWindowedStorage;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedKeyedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This is an implementation of WindowedOperator that takes in key value 
pairs as input and gives out key value pairs
+ * as output. If your operation is not key based, please use {@link 
WindowedOperatorImpl}.
+ */
+public class KeyedWindowedOperatorImpl
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #321: APEXMALHAR-2120 #resolve #comment solve probl...

2016-06-24 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/321#discussion_r68420530
  
--- Diff: 
kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---
@@ -264,49 +336,57 @@ public void testInputOperator(boolean hasFailure, 
boolean idempotent) throws Exc
 collector.isIdempotentTest = idempotent;
 
 // Connect ports
-dag.addStream("Kafka message", node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
-
+dag.addStream("Kafka message" + testName, node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
 if (hasFailure) {
   setupHasFailureTest(node, dag);
 }
 
 // Create local cluster
-final LocalMode.Controller lc = lma.getController();
+LocalMode.Controller lc = lma.getController();
 lc.setHeartbeatMonitoringEnabled(false);
 
-lc.runAsync();
-
-// Wait 30s for consumer finish consuming all the messages
-boolean notTimeout = latch.await(4, TimeUnit.MILLISECONDS);
-Collections.sort(tupleCollection, new Comparator()
-{
-  @Override
-  public int compare(String o1, String o2)
-  {
-return Integer.parseInt(o1.split("_")[1]) - 
Integer.parseInt(o2.split("_")[1]);
-  }
-});
-Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, 
notTimeout);
+//let the Controller to run the inside another thread. It is almost 
same as call Controller.runAsync(), 
+//but Controller.runAsync() don't expose the thread which run it, so 
we don't know when the thread will be terminated.
+//create this thread and then call join() to make sure the Controller 
shutdown completely.
+monitorThread = new Thread((StramLocalCluster)lc, "master");
+monitorThread.start();
+
+boolean notTimeout = true;
+try {
+  // Wait 60s for consumer finish consuming all the messages
+  notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
+  lc.shutdown();
+
+  //wait until control thread finished.
+  monitorThread.join();
+} catch (Exception e) {
+  logger.warn(e.getMessage());
+}
+
+t.join();
+
+logger.info("Number of received/expected tuples: {}/{}, testName: {}, 
tuples: \n{}", tupleCollection.size(), expectedReceiveCount, testName, 
tupleCollection);
--- End diff --

Log this only when something goes wrong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-core pull request #351: APEXCORE-405 Common API to launch on local mode...

2016-06-24 Thread PramodSSImmaneni
GitHub user PramodSSImmaneni opened a pull request:

https://github.com/apache/apex-core/pull/351

APEXCORE-405 Common API to launch on local mode or cluster

@tweise please take a look.

Common API to launch on local mode or cluster. No functionality lost in 
local mode but cast in a different way (LocalModeImpl updated appropriately). 
Some methods deprecated but don't have to be. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PramodSSImmaneni/incubator-apex-core 
APEXCORE-405

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #351


commit e7f5e9749fa2f13af1dbf56ebcf43a6d05e4578e
Author: Pramod Immaneni 
Date:   2016-06-24T07:44:57Z

APEXCORE-405 Common API to launch on local mode or cluster

commit 1de1c70886c18cd9a452d3f4669240582e8f188e
Author: Pramod Immaneni 
Date:   2016-06-24T07:59:51Z

Checkstyle fixes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-405) Provide an API to launch DAG on the cluster

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347935#comment-15347935
 ] 

ASF GitHub Bot commented on APEXCORE-405:
-

GitHub user PramodSSImmaneni opened a pull request:

https://github.com/apache/apex-core/pull/351

APEXCORE-405 Common API to launch on local mode or cluster

@tweise please take a look.

Common API to launch on local mode or cluster. No functionality lost in 
local mode but cast in a different way (LocalModeImpl updated appropriately). 
Some methods deprecated but don't have to be. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PramodSSImmaneni/incubator-apex-core 
APEXCORE-405

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-core/pull/351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #351


commit e7f5e9749fa2f13af1dbf56ebcf43a6d05e4578e
Author: Pramod Immaneni 
Date:   2016-06-24T07:44:57Z

APEXCORE-405 Common API to launch on local mode or cluster

commit 1de1c70886c18cd9a452d3f4669240582e8f188e
Author: Pramod Immaneni 
Date:   2016-06-24T07:59:51Z

Checkstyle fixes




> Provide an API to launch DAG on the cluster
> ---
>
> Key: APEXCORE-405
> URL: https://issues.apache.org/jira/browse/APEXCORE-405
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Pramod Immaneni
>Assignee: Pramod Immaneni
>
> Today API exists to launch a DAG in local mode but such an API is not 
> available to launch the app on the cluster, only a CLI tool is available. 
> Provide an API to be able to do this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)