[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932258#comment-15932258
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user jgrier commented on the issue:

https://github.com/apache/flink/pull/2982
  
Nice :)


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
> Fix For: 1.3.0
>
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931107#comment-15931107
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin closed the pull request at:

https://github.com/apache/flink/pull/2982


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
> Fix For: 1.3.0
>
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931106#comment-15931106
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
Wow, finally in!

It was fun and thanks for you help along every step of the journey!

Chen


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
> Fix For: 1.3.0
>
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931104#comment-15931104
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
@chenqin I finally merged it. Could you please also close this PR?

And thanks again for working on this for so long!  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931103#comment-15931103
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/3484


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930241#comment-15930241
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm 
now waiting for travis to give the green light and then I'll merge.

@chenqin A lot of thanks also to you for working on this and pushing it 
with me!  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930220#comment-15930220
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106685158
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Not sure what the edge id exactly does and who uses it so I prefer to not 
touch it, for now.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930214#comment-15930214
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

That is what I remember as well.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930204#comment-15930204
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684096
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

I must have removed the check by accident. I think we agreed to rename this 
to something more meaningful and keep it, right?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926168#comment-15926168
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106163521
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 ---
@@ -63,6 +66,7 @@
private TypeSerializer typeSerializerIn1;
private TypeSerializer typeSerializerIn2;
private TypeSerializer typeSerializerOut;
+   private Map typeSerializerMap;
--- End diff --

This is not used anywhere in the code. Can it be removed, along with the 
`getTypeSerializerOut()` and `setTypeSerializerOut()`?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926171#comment-15926171
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106162497
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.sideoutput;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * An example that illustrates the use of side outputs.
+ *
+ * This is a modified version of {@link 
org.apache.flink.streaming.examples.windowing.WindowWordCount}
+ * that has a filter in the tokenizer and only emits some words for 
counting
+ * while emitting the other words to a side output.
+ */
+public class SideOutputExample {
+
+   /**
+* We need to create an {@link OutputTag} so that we can reference it 
when emitting
+* data to a side output and also to retrieve the side output stream 
from an operation.
+*/
+   static final OutputTag rejectedWordsTag = new 
OutputTag("rejected") {};
--- End diff --

Here we add a side output but we do nothing to show that it works. Probably 
we can add a prefix "rejected-" to the record and print it, so that the user 
can see what the side output does.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926170#comment-15926170
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106163045
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
--- End diff --

The `null` should go to the next line for uniformity.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926169#comment-15926169
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106162749
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Does it make sense to add the outputTag also in the `edgeId`?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925916#comment-15925916
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138342
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
+   public  DataStream getSideOutput(OutputTag sideOutputTag){
+   sideOutputTag = clean(sideOutputTag);
--- End diff --

I think it is better to not reuse the argument variable but create a new 
one. 


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925923#comment-15925923
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139311
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -326,33 +327,46 @@ public int getChainLength() {
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map streamOutputs,
-   List allOperators)
+   List allOperators,
+   OutputTag outputTag)
{
// create the output that the operator writes to first. this 
may recursively create more operators
Output output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, 
userCodeClassloader, streamOutputs, allOperators);
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
+
chainedOperator.setup(containingTask, operatorConfig, output);
 
allOperators.add(chainedOperator);
 
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
-   return new ChainingOutput<>(chainedOperator, this);
+   return new ChainingOutput<>(chainedOperator, this, 
outputTag);
}
else {
TypeSerializer inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, this);
+   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
}
}

private  RecordWriterOutput createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
Environment taskEnvironment,
-   String taskName)
-   {
-   TypeSerializer outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+   String taskName) {
+   OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
+
+   TypeSerializer outSerializer = null;
+
+   if (edge.getOutputTag() != null) {
+   // side output
+   outSerializer = upStreamConfig.getTypeSerializerSideOut(
+   edge.getOutputTag(), 
taskEnvironment.getUserClassLoader());
+   } else {
+   // main output
--- End diff --

this can become one line.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925918#comment-15925918
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138237
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
--- End diff --

Missing space between the ) and the {


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925917#comment-15925917
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138473
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
+   public  DataStream getSideOutput(OutputTag sideOutputTag){
+   sideOutputTag = clean(sideOutputTag);
+
+   TypeInformation type = 
requestedSideOutputs.get(sideOutputTag);
+   if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
+   throw new UnsupportedOperationException("A side output 
with a matching id was " +
+   "already requested with a different 
type. This is not allowed, side output " +
+   "ids need to be unique.");
+   }
+
+   requestedSideOutputs.put(sideOutputTag, 
sideOutputTag.getTypeInfo());
+
--- End diff --

The `requireNotNull` should be in the beginning of the method.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925925#comment-15925925
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138895
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an 
upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * This does not create a physical operation, it only affects how 
upstream operations are
+ * connected to downstream operations.
+ *
+ * @param  The type of the elements that result from this {@code 
SideOutputTransformation}
+ */
--- End diff --

Here we do not check if the `input` is `null` (we do it in the caller 
method only) but we try get the parallelism. We could have the parallelism as a 
separate argument, and then, after the `super()` check if the input is null.
This makes the code of the class self-contained as you do not have to check 
other classes to see if the `input` can be `null` or not. What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925924#comment-15925924
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138062
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

Still the first comment  applies: the `equals` can be simplified given that 
`id != null`.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925919#comment-15925919
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139079
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

This is not used any more, right? So it can be deleted.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925921#comment-15925921
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139593
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +486,53 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   protected  void pushToOperator(StreamRecord record) {
try {
+   // we know that the given outputTag matches our 
OutputTag so the record
+   // must be of the type that our operator (and 
Serializer) expects.
+   @SuppressWarnings("unchecked")
+   StreamRecord castRecord = (StreamRecord) 
record;
+
numRecordsIn.inc();
-   StreamRecord copy = 
record.copy(serializer.copy(record.getValue()));
+   StreamRecord copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
-   operator.processElement(copy);
-   }
-   catch (Exception e) {
+   operator.processElement(castRecord);
--- End diff --

This should be `copy`, not `castRecord`.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925922#comment-15925922
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139449
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,18 +401,49 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
--- End diff --

This can become `private` as the copying alternative has its own 
implementation, right?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925920#comment-15925920
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139530
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +486,53 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
--- End diff --

This can become `private`, as before.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906129#comment-15906129
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks @aljoscha I will have a look on Monday.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906128#comment-15906128
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
@kl0u @chenqin I cleaned up the commits, distributed the fixes from the 
comments to the right commits. I also added more tests/ITCases for: detecting 
name clashes in side output IDs, side outputs with multiple consumers.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905414#comment-15905414
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441679
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I see. The problem is that if this does not work, then we can have 
important side effects.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905412#comment-15905412
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441456
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I would have liked to include the `TypeInformation` into the check but we 
can't do that because it's transient. I'll try and figure something out for 
checking that side outputs are unique, not as easy as it seems.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New 

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905390#comment-15905390
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3484
  
Thanks @kl0u for the (already) quite thorough review! I'll push a commit 
with fixes.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905381#comment-15905381
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436408
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

fixed.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905380#comment-15905380
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436193
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

This I'm fixing, as I mentioned above.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905378#comment-15905378
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105435806
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` 
and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to 
other outputs but instead push into the operator or into the network. For the 
other `Outputs` removing the duplication is not possible because inside the 
respective `output()` method they call `output()` of another `Output`. They 
call either with an `OutputTag` or without, so the method body is not actually 
a duplicate.

I did find another bug, though, where `CopyingBroadcastingOutputCollector` 
in `OperatorChain` was not calling the correct `collect()` method on the 
downstream `Outputs`.  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905199#comment-15905199
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105409533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

We can't throw, because that would crash the program. This is a good catch, 
though! I will remove the duplication.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905145#comment-15905145
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402830
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1618,14 +1664,16 @@ public void 
testDropDueToLatenessSessionZeroLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
-   LATENESS);
+   LATENESS,
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905139#comment-15905139
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401808
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +491,55 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
--- End diff --

Again the two `collect()` have duplicate code (after the casting).


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905151#comment-15905151
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105400890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -326,33 +327,48 @@ public int getChainLength() {
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map streamOutputs,
-   List allOperators)
+   List allOperators,
+   OutputTag outputTag)
{
// create the output that the operator writes to first. this 
may recursively create more operators
Output output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, 
userCodeClassloader, streamOutputs, allOperators);
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
+
chainedOperator.setup(containingTask, operatorConfig, output);
 
allOperators.add(chainedOperator);
 
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
-   return new ChainingOutput<>(chainedOperator, this);
+   return new ChainingOutput<>(chainedOperator, this, 
outputTag);
}
else {
TypeSerializer inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, this);
+   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
}
}

private  RecordWriterOutput createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
Environment taskEnvironment,
-   String taskName)
-   {
-   TypeSerializer outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+   String taskName) {
+   OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
+
+   TypeSerializer outSerializer = null;
+
--- End diff --

too much line breaking in the following `if() else()` . We could easily 
reduce it or even replace it with:
```
TypeSerializer outSerializer = (edge.getOutputTag() != null) ? 
upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), 
taskEnvironment.getUserClassLoader()) :

upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
```


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905149#comment-15905149
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402899
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1702,15 +1754,16 @@ public void 
testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905137#comment-15905137
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397306
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
--- End diff --

We do not need both lines with the checks. We can just have:

`this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");`


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905144#comment-15905144
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105403355
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 ---
@@ -40,6 +41,12 @@ public void collect(StreamRecord record) {
}
 
@Override
+   public  void collect(
--- End diff --

The signature fits in one line


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905150#comment-15905150
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397483
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
--- End diff --

No need for line breaking.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905138#comment-15905138
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -300,6 +303,36 @@ private StreamGraph 
generateInternal(List transformatio
}
 
/**
+* Transforms a {@code SideOutputTransformation}.
+*
+* 
+* For this we create a virtual node in the {@code StreamGraph} that 
holds the side-output
+* {@link org.apache.flink.util.OutputTag}.
+*
+* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+*/
+   private  Collection 
transformSideOutput(SideOutputTransformation sideOutput) {
+   StreamTransformation input = sideOutput.getInput();
+   Collection resultIds = transform(input);
+
+
+   // the recursive transform might have already transformed this
+   if (alreadyTransformed.containsKey(sideOutput)) {
+   return alreadyTransformed.get(sideOutput);
+   }
+
+   List virtualResultIds = new ArrayList<>();
+
+   for (int inputId : resultIds) {
+   int virtualId = StreamTransformation.getNewNodeId();
+   streamGraph.addVirtualSideOutputNode(inputId, 
virtualId, sideOutput.getOutputTag());
+   virtualResultIds.add(virtualId);
+   }
+   return virtualResultIds;
+   }
+
+
--- End diff --

Leave only one empty line.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905143#comment-15905143
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402616
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1267,14 +1291,16 @@ public void testLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905142#comment-15905142
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

Again the two `collect()` methods have much identical code. We can put that 
common code in a separate private method and calls this instead of repeating 
the code.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905147#comment-15905147
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402678
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws 
Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
EventTimeTrigger.create(),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905153#comment-15905153
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105403240
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
 ---
@@ -53,5 +54,11 @@ public void collect(StreamRecord record) {
}
 
@Override
--- End diff --

The signature can go on the same line


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905146#comment-15905146
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398702
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = 
virtualSideOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   outputTag = 
virtualSideOutputNodes.get(virtualId).f1;
+   }
+   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, null, outputTag);
+   } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override 
earlier selections
outputNames = 
virtualSelectNodes.get(virtualId).f1;
}
-   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames);
+   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames, outputTag);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) 
{
int virtualId = upStreamVertexID;
upStreamVertexID = 
virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = 
virtualPartitionNodes.get(virtualId).f1;
}
-   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames);
+
--- End diff --

Remove line for uniformity with the `if() ...` above.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905155#comment-15905155
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397933
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

Two points:
1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` 
because we check at the constructor, so this method can be simplified.
2) we never check for uniqueness of the `outputTag.id`. We should do it at 
the translation. This is also a correctness issue as this may result in 
undesired sideoutput "collisions.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
>  

[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905148#comment-15905148
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402262
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
+   @Override
+   public  void collect(
+   OutputTag outputTag, StreamRecord record) 
{
+   for (int i = 0; i < outputs.length - 1; i++) {
+   Output output = outputs[i];
+
+   // due to side outputs, StreamRecords of 
varying types can pass through the broadcasting
+   // collector so we need to cast
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   StreamRecord shallowCopy = (StreamRecord) 
record.copy(record.getValue());
+   output.collect(shallowCopy);
+   }
+
--- End diff --

Duplicate comment with the one inside the loop. Also the "// don't copy for 
the last output" is not right because we actually create a copy before.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905140#comment-15905140
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402746
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1528,14 +1572,16 @@ public void 
testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905135#comment-15905135
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397424
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
--- End diff --

No need for line breaking:
`TypeHint typeHint = new TypeHint(OutputTag.class, this, 0) {};`


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905134#comment-15905134
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397096
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java ---
@@ -46,7 +46,15 @@
public TypeHint() {
this.typeInfo = TypeExtractor.createTypeInfo(this, 
TypeHint.class, getClass(), 0);
}
-   
+
+   /**
+* Creates a hint for the generic type in the class signature.
+*/
+   public TypeHint(Class baseClass, Object instance, int 
genericParameterPos) {
+   this.typeInfo = TypeExtractor.createTypeInfo(instance, 
baseClass, instance.getClass(), genericParameterPos);
+   }
+
+
--- End diff --

Remove one of the 2 empty lines.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905154#comment-15905154
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401947
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

The arguments can go in the same line as the method name.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905136#comment-15905136
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105399939
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

The bodies of the following two `collect(...)` methods are identical modulo 
the check for the output tag.
It makes sense to put the common code in the same `private` method and call 
that method from each one of these (after doing the necessary checks). 

As a side comment, should we just `return` if the wrong method is called or 
throw an exception ?

This "Duplicate code" pattern appears some times in the code.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905141#comment-15905141
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105399457
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an 
upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * This does not create a physical operation, it only affects how 
upstream operations are
+ * connected to downstream operations.
+ *
+ * @param  The type of the elements that result from this {@code 
SideOutputTransformation}
+ */
+public class SideOutputTransformation extends StreamTransformation {
+   private final StreamTransformation input;
--- End diff --

Leave a blank line here.



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905152#comment-15905152
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398138
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
 ---
@@ -17,13 +17,19 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
--- End diff --

Unused imports.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904878#comment-15904878
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105372676
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I just added a test for the behaviour with a "weird" `WindowAssigner`. 


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904842#comment-15904842
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105368360
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I thought about this again. I think it doesn't hurt to have it because it 
catches the case when a `WindowAssigner` doesn't assign any windows. In that 
case an element is also "skipped" but it is not necessarily considered late. 
What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903561#comment-15903561
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105235710
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

Thanks @kl0u Good catch! 

I put `isLate` there with intention to filter out `dropped events with 
other reasons` which I may not aware of.  lateArrivingEvents is really `late 
arriving` and `dropped` events.

@aljoscha If that is redundant check, we might just remove `isLate`. 
What do you think?
 


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903460#comment-15903460
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105225561
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

@chenqin and @aljoscha I am starting to review the PR and I was wondering 
when is this new `isLate()` check needed? At least for the out-of-box window 
assigners, this seems to be a redundant check.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902850#comment-15902850
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3484
  
Unfortunately, I won't have time to look over this PR this week. Thanks for 
pinging me @aljoscha @chenqin.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901828#comment-15901828
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104997566
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

That sounds right, good catch! 
Thanks for fixing!


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901801#comment-15901801
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104996349
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

sounds good


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901795#comment-15901795
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104995971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

sounds good to me!


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901094#comment-15901094
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104895136
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

I think the comment is a leftover from copying this code from split/select. 
For side outputs it can't happen that you have multiple "selects" after one 
another. Will remove the comment. What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900996#comment-15900996
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104881269
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

You're right, I'm changing this to simply have two loops.

I think you introduced this in the first place, though.  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900991#comment-15900991
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104880615
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

The method is already called `addVirtualSideOutputNode()`. I'm adjusting 
the name of the field. Thanks!


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900988#comment-15900988
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104880412
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +418,35 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* Example:
+* {@code
+* static final OutputTag sideOutputTag = new 
OutputTag("side-output") {};
+*
+* public void flatMap(X value, Collector out) throws Exception 
{
--- End diff --

Fixing


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900753#comment-15900753
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847832
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, 
StreamGraph streamGraph) {
headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof 
ForwardPartitioner)
&& upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
+   && edge.getOutputTag() == null // disable 
chaining for side outputs
--- End diff --

I remember you mentioned latest version side output works with chain or no?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900754#comment-15900754
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847733
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

Do you think introduce this dependency is good idea or bad idea? Up to you 
:)


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900757#comment-15900757
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

We might consider use  `addVirtualSideOutputNode`  and 
`virtualSideOutputNodes`. Unless we want to refactor move away from current 
assumption `operator` to `<... operator 
<...`


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900755#comment-15900755
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

may consider call out this behavior in `getSideOutput` comments


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900756#comment-15900756
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104846393
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +418,35 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* Example:
+* {@code
+* static final OutputTag sideOutputTag = new 
OutputTag("side-output") {};
+*
+* public void flatMap(X value, Collector out) throws Exception 
{
--- End diff --

Comments seems out of date, I think we already decided to get ride of 
CollectorWrapper


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899255#comment-15899255
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3484

[FLINK-4460] Side Outputs in Flink

This is a refinement of #2982 by @chenqin.

I changed the API a bit, added support for side outputs to 
`ProcessFunction`, enabled side outputs to work with chaining, added proper 
Scala API and a Scala API test and added documentation.

R: @uce @kl0u and @chenqin for review, please

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3484


commit 1746c7e5981942dfb0e57954f8241a400b699120
Author: Chen Qin 
Date:   2016-10-21T19:38:04Z

[FLINK-4460] Add support for side outputs

This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.

commit 362fcb38abf525e704e97505186923ae77dc167b
Author: Aljoscha Krettek 
Date:   2016-10-21T19:38:04Z

[FLINK-4460] Add side outputs for ProcessFunction

commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50
Author: Aljoscha Krettek 
Date:   2017-02-16T13:09:26Z

[FLINK-4460] Make chaining work with side outputs

commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40
Author: Aljoscha Krettek 
Date:   2017-02-16T13:41:10Z

[FLINK-4460] Expose OutputTag constructor that takes TypeInformation

commit 935ff90dadad380db293265cc49d072d764cf510
Author: Chen Qin 
Date:   2017-03-01T14:36:17Z

[FLINK-4460] Provide late-data output for window operations

We use side outputs to emit dropped late data.

commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd
Author: Aljoscha Krettek 
Date:   2017-03-07T10:06:31Z

[FLINK-4460] Add proper side output API for Scala API

The Scala side output API uses context bounds to get a TypeInformation
for an OutputTag. This also adds a SideOutputITCase for the Scala API.

commit 66182b79931016a04286279f5aba63af30442b02
Author: Aljoscha Krettek 
Date:   2017-03-07T11:06:01Z

[FLINK-4460] Add documentation for side outputs




> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897560#comment-15897560
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user wenlong88 commented on the issue:

https://github.com/apache/flink/pull/3438
  
thanks for explaination, I have such concern because we have just suggested 
our users to use processFunction to implement their jobs, they need to change 
their code too when we sync the cimmit.after all, it is really nice to have 
timer in more scenarios.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897539#comment-15897539
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
Merged


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897534#comment-15897534
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3438


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897532#comment-15897532
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104446345
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param  Type of the input elements.
  * @param  Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction extends Function {
+public abstract class ProcessFunction extends AbstractRichFunction {
--- End diff --

hi @wenlong88 in the ML discussion 
(https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E)
 we decided to make `ProcessFunction` available on non-keyed streams as well to 
allow using side outputs there. This requires making the `onTimer()` method 
abstract, otherwise every user would always have to implement it. We marked 
`ProcessFunction` as `@PublicEvolcing` just for such cases; it's still a very 
young API and we didn't know exactly what was going to be needed in the end.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897185#comment-15897185
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104399614
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
}
 
/**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator process(ProcessFunction 
processFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   processFunction,
+   ProcessFunction.class,
+   false,
+   true,
+   getType(),
+   Utils.getCallLocationName(),
+   true);
+
+   return process(processFunction, outType);
+   }
+
+   /**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+* @param outputType {@link TypeInformation} for the result type of the 
function.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @Internal
+   public  SingleOutputStreamOperator process(
--- End diff --

Makes sense to keep it like that. The benefits to base the Scala API on top 
of the Java API instead of duplicating it are very persuasive, too.  So +1 to 
keep it as is.  I was just wondering whether users would be confused by this.



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897086#comment-15897086
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
@uce There is some documentation that says that `ProcessFunction` is only 
available on keyed streams. I'll change that.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897065#comment-15897065
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
@uce There is not issue for 2.0 to track this because I don't think there 
is consensus about always having timestamps.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897064#comment-15897064
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104384206
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
--- End diff --

fixing



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897060#comment-15897060
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104383800
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
}
 
/**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator process(ProcessFunction 
processFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   processFunction,
+   ProcessFunction.class,
+   false,
+   true,
+   getType(),
+   Utils.getCallLocationName(),
+   true);
+
+   return process(processFunction, outType);
+   }
+
+   /**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+* @param outputType {@link TypeInformation} for the result type of the 
function.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @Internal
+   public  SingleOutputStreamOperator process(
--- End diff --

Yes, it's exposed for that. The pattern, so far, is for methods to also 
expose a public method that takes a `TypeInformation` because we get the 
`TypeInformation` from the context bound in the Scala API.

Calling `transform()` manually is an option but if we do that we would 
basically not base the Scala API on the Java API anymore and we would have code 
that instantiates the Stream Operators in both the Java and Scala API. For 
example, right now we have the code for instantiating a flat map operator in 
`(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method.

What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897031#comment-15897031
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3438
  
@rmetzger @aljoscha I would agree with Aljoscha that your point is 
independent of this PR. Is there an issue for 2.0 to track this?



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897029#comment-15897029
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104377167
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
}
 
/**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator process(ProcessFunction 
processFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   processFunction,
+   ProcessFunction.class,
+   false,
+   true,
+   getType(),
+   Utils.getCallLocationName(),
+   true);
+
+   return process(processFunction, outType);
+   }
+
+   /**
+* Applies the given {@link ProcessFunction} on the input stream, 
thereby
+* creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param processFunction The {@link ProcessFunction} that is called 
for each element
+*  in the stream.
+* @param outputType {@link TypeInformation} for the result type of the 
function.
+*
+* @param  The type of elements emitted by the {@code 
ProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @Internal
+   public  SingleOutputStreamOperator process(
--- End diff --

Is this internal method only exposed as `public` for the Scala API? If yes, 
I'm wondering if it makes sense to call `transform` manually in the Scala 
`DataStream` API.




> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897028#comment-15897028
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104378368
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param  Type of the input elements.
  * @param  Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction extends Function {
+public abstract class ProcessFunction extends AbstractRichFunction {
--- End diff --

Missing serialVersionUID


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897027#comment-15897027
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104377392
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
--- End diff --

typo: as => is?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1589#comment-1589
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user wenlong88 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3438#discussion_r104334699
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 ---
@@ -19,30 +19,35 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
 /**
  * A function that processes elements of a stream.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output. The function can also query the time and set 
timers. When
- * reacting to the firing of set timers the function can emit yet more 
elements.
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
  *
- * The function will be called for every element in the input stream 
and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this 
function can also query
- * the time (both event and processing) and set timers, through the 
provided {@link Context}.
- * When reacting to the firing of set timers the function can directly 
emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} as always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param  Type of the input elements.
  * @param  Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction extends Function {
+public abstract class ProcessFunction extends AbstractRichFunction {
--- End diff --

hi, changing form `interface` to `class` is incompatible on the user side. 
Can't ProcessFunction just  extend RichFunction?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895742#comment-15895742
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
I think the discussion of timestamps and additional interfaces is 
orthogonal to this PR: `KeyedProcessOperator` is a renaming of the pre-existing 
`ProcessOperator` and the new `ProcessOperator` is a simplification that does 
away with timers. The interface for timestamps exists in the current code base, 
if we want to change that we should open other Jira issues.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895737#comment-15895737
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3438
  
In addition to throwing an exception, we should also expose 
`element.hasTimestamp()` to offer our users a clean way of checking for 
timestamps.
Lets see what @uce or other reviewers think about this.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895553#comment-15895553
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
@rmetzger Yes, it's unfortunate that in our model not all elements always 
have a timestamp. The other alternative is throwing an exception when trying to 
access a non-existing timestamp.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894999#comment-15894999
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3438
  
I looked over the changes and didn't find anything critical. The only thing 
that made me thinking was the boxed `Long` type for the timestamp(). I assume 
you decided for this approach to signal timestamp unavailability using `null`. 
The Java documentation does not recommend to rely on autoboxing for performance 
critical code: 
http://docs.oracle.com/javase/1.5.0/docs/guide/language/autoboxing.html

Tests, Scala API were done. I assume that we don't need to explicitly 
mention support for the process function on non-keyed streams.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891296#comment-15891296
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha 
Nice! Let's do "Chen Qin qinnc...@gmail.com"


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890301#comment-15890301
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
@chenqin Would you like your commits to be attributed to "Chen Qin 
"
or "Chen Qin "? I see both in your set of commits?

I'm finally putting everything together and hopefully merging soon since 
the mailing list discussion seems to favour our approach.  


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890141#comment-15890141
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3438
  
R: @uce or @rmetzger for review, please


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890140#comment-15890140
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3438

[FLINK-4460] Allow ProcessFunction on non-keyed streams

This is in preparation for side outputs, which will only work on 
`ProcessFunction`. We still want side outputs on non-keyed streams so we have 
to make `ProcessFunction` available there.

See this ML thread for reference: 
https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-4460-process-for-everyone

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3438.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3438


commit 38f33f2399598b521a7e34e8dea1d236f5672042
Author: Aljoscha Krettek 
Date:   2017-03-01T09:57:12Z

[FLINK-4460] Make ProcessFunction abstract, add default onTime() method

This is in preparation of allowing ProcessFunction on DataStream because
we will use it to allow side outputs from the ProcessFunction Context.

commit f0dd2c0d81a847cfa4f3d241ce874db6807caee2
Author: Aljoscha Krettek 
Date:   2017-03-01T10:41:02Z

[FLINK-4660] Allow ProcessFunction on DataStream

Introduce new ProcessOperator for this. Rename the pre-existing
ProcessOperator to KeyedProcessOperator.

commit 29ca9b4b794522b35d84d9f19edc5b0bb9f64912
Author: Aljoscha Krettek 
Date:   2017-03-01T10:33:03Z

[FLINK-4460] Make CoProcessFunction abstract, add default onTime() method

This is in preparation of allowing CoProcessFunction on a non-keyed
connected stream.  we will use it to allow side outputs from the
ProcessFunction Context.

commit a26accf8feebaf9d3566055c0d1eb3006986bfd6
Author: Aljoscha Krettek 
Date:   2017-03-01T11:02:34Z

[FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams

Introduce new CoProcessOperator for this. Rename the pre-existing
CoProcessOperator to KeyedCoProcessOperator.




> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880467#comment-15880467
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
Thanks for looking at that! I'll open a new discussion thread on the 
Mailing lists to discuss Side Outputs and split/select and how we're going to 
proceed with that.

Regarding your other questions: I think we might add such an `Evaluator` 
interface in the future but for now I would like to keep it simple and see if 
that works for people. And yes, a user would have to use `allowedLateness` and 
`sideOutputLateData` at the same time if they want to use late data, or they 
can go with the default allowed lateness of zero and also get the late data as 
a side output.


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872353#comment-15872353
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha Looks good to me 

I briefly looked at your git branch, a minor comment would be adding 
comments to `sideOutputLateData` so user get better idea when they opt-in to 
late arriving event stream. 

Initial late arriving event is decided by comparing watermark & eventTime, 
do you think there is a need to allow user pass a kinda `Evaluator` and enable 
user sideOutput any kind of sideOutputs?

`window.sideOutput(OutputTag, Evaluator)`

`interface Evaluator{ MergedWindows, key, watermark}`

- Regarding `split` `select`, I think there is a chance of consolidate 
select and build upon `OutputTag`, but might be out of this PR's scope.
- Regarding to `WindowStream`, I am a bit confused to figure out  if I use 
`allowedlateness` and  `sideOutputLateData` at same time.

Thanks,
Chen





> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872032#comment-15872032
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
@chenqin 

I rebased and consolidated your PR a bit and played around with the APIs.

Some of the changes:
 - Separation into internal changes/window operator changes and 
user-function changes. I have a prototype commit that exposes side outputs 
using `ProcessFunction`.
 - I removed `CollectorWrapper`/`RichCollector` in favour of the 
`ProcessFunction` approach
 - The internal implementation now doesn't store the `OutputTag` on the 
`StreamRecord` but instead adds an additional method on `Output` that should be 
used for emitting data to a side output. Side outputs now also work with 
chaining.
 - `WindowedStream` is changed to add a `sideOutputLateData()` method that 
is used to specify that late data side output is required. This is more general 
than putting it into the method signature of `apply()` because it will simply 
work for all different window types.

I quickly talked to @StephanEwen and we agreed that we need to further 
think about how we want to expose side outputs for user-defined functions. 
Especially, we have to think about what this addition means for the 
`split()`/`select()` pattern. I will also do another change where 
`sideOutputLateData()` is not required but instead will be added when the 
late-date stream is requested.

What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2017-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868176#comment-15868176
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha, 

diff has been updated and merged to support `apply(Function, 
lateElementsOutput);`
`Collector` refactor seems not viable option as stated above.

Seems travis-ci timeouted here, is there a way to increase timeout setting?
https://travis-ci.org/chenqin/flink/builds/201329246

Chen



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15746680#comment-15746680
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user chenqin commented on the issue:

https://github.com/apache/flink/pull/2982
  
@aljoscha Thanks for your time. We can chat more after 1.2 release!

I think it makes sense to extends Collector, even though we may not remove 
collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in 
[FLIP-13 email 
thread](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html)
 

Only point I would like to add: there seems decent amount of refactor to 
replace underlining output chains using collect(tag, element), yet seems 
reasonable investment moving forward (multiple inputs/ multiple outputs)

`tooLateEvents()` method is something added for user's convenience. should 
be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same 
type as input (which is like already fixed once input type defined). Add late 
arriving tag within apply method seems redudant. In fact, without any changes 
to this diff, user also be able to access late arriving events via following 
way.

`
OutputTag lateElementsOutput = new LateArrivingOutputTag();
DataStream input = ...
SingleOutputStreamOperator windowed = input
  .keyBy(...)
  .window(...)
  .apply(Function);

DataStream lateElements = windowed.getSideOutput(lateElementsOutput);
`

Thanks,
Chen



> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745298#comment-15745298
 ] 

ASF GitHub Bot commented on FLINK-4460:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2982
  
@chenqin I had a quick look at the implementation and it looks quite good. 
I'll look at it in more detail once the 1.2 release is out and then I'll also 
have more thorough comments.

These are some quick comments off the top of my head:
 - I think we can extend `Collector` with a `collect(OutputTag, T)` method. 
Then we wouldn't need the extra `RichCollector` and `CollectorWrapper` to work 
around that.
 - For `WindowedStream` I would like to have something like this:
```
OutputTag lateElementsOutput = ...;
DataStream input = ...
SingleOutputStreamOperator windowed = input
  .keyBy(...)
  .window(...)
  .apply(Function, lateElementsOutput);

DataStream lateElements = windowed.getSideOutput(lateElementsOutput);
```
or maube something else if we find a better Idea. With the 
`WindowedStream.tooLateElements()` this would instantiate an extra 
`WindowOperator` just for getting late elements while another window operator 
would be responsible for processing the actual elements. That seems wasteful.

What do you think?


> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >