[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-18 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/3484


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106685158
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Not sure what the edge id exactly does and who uses it so I prefer to not 
touch it, for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

That is what I remember as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106684096
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

I must have removed the check by accident. I think we agreed to rename this 
to something more meaningful and keep it, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106162497
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.sideoutput;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * An example that illustrates the use of side outputs.
+ *
+ * This is a modified version of {@link 
org.apache.flink.streaming.examples.windowing.WindowWordCount}
+ * that has a filter in the tokenizer and only emits some words for 
counting
+ * while emitting the other words to a side output.
+ */
+public class SideOutputExample {
+
+   /**
+* We need to create an {@link OutputTag} so that we can reference it 
when emitting
+* data to a side output and also to retrieve the side output stream 
from an operation.
+*/
+   static final OutputTag rejectedWordsTag = new 
OutputTag("rejected") {};
--- End diff --

Here we add a side output but we do nothing to show that it works. Probably 
we can add a prefix "rejected-" to the record and print it, so that the user 
can see what the side output does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106163045
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
--- End diff --

The `null` should go to the next line for uniformity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106162749
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -48,15 +49,25 @@
 * output selection).
 */
private final List selectedNames;
+
+   /**
+* The side-output tag (if any) of this {@link StreamEdge}.
+*/
+   private final OutputTag outputTag;
+
+   /**
+* The {@link StreamPartitioner} on this {@link StreamEdge}.
+*/
private StreamPartitioner outputPartitioner;
 
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
+   this.outputTag = outputTag;
 
--- End diff --

Does it make sense to add the outputTag also in the `edgeId`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106163521
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 ---
@@ -63,6 +66,7 @@
private TypeSerializer typeSerializerIn1;
private TypeSerializer typeSerializerIn2;
private TypeSerializer typeSerializerOut;
+   private Map typeSerializerMap;
--- End diff --

This is not used anywhere in the code. Can it be removed, along with the 
`getTypeSerializerOut()` and `setTypeSerializerOut()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139593
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +486,53 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   protected  void pushToOperator(StreamRecord record) {
try {
+   // we know that the given outputTag matches our 
OutputTag so the record
+   // must be of the type that our operator (and 
Serializer) expects.
+   @SuppressWarnings("unchecked")
+   StreamRecord castRecord = (StreamRecord) 
record;
+
numRecordsIn.inc();
-   StreamRecord copy = 
record.copy(serializer.copy(record.getValue()));
+   StreamRecord copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
operator.setKeyContextElement1(copy);
-   operator.processElement(copy);
-   }
-   catch (Exception e) {
+   operator.processElement(castRecord);
--- End diff --

This should be `copy`, not `castRecord`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138473
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
+   public  DataStream getSideOutput(OutputTag sideOutputTag){
+   sideOutputTag = clean(sideOutputTag);
+
+   TypeInformation type = 
requestedSideOutputs.get(sideOutputTag);
+   if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
+   throw new UnsupportedOperationException("A side output 
with a matching id was " +
+   "already requested with a different 
type. This is not allowed, side output " +
+   "ids need to be unique.");
+   }
+
+   requestedSideOutputs.put(sideOutputTag, 
sideOutputTag.getTypeInfo());
+
--- End diff --

The `requireNotNull` should be in the beginning of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138895
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an 
upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * This does not create a physical operation, it only affects how 
upstream operations are
+ * connected to downstream operations.
+ *
+ * @param  The type of the elements that result from this {@code 
SideOutputTransformation}
+ */
--- End diff --

Here we do not check if the `input` is `null` (we do it in the caller 
method only) but we try get the parallelism. We could have the parallelism as a 
separate argument, and then, after the `super()` check if the input is null.
This makes the code of the class self-contained as you do not have to check 
other classes to see if the `input` can be `null` or not. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139530
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +486,53 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
--- End diff --

This can become `private`, as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138342
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
+   public  DataStream getSideOutput(OutputTag sideOutputTag){
+   sideOutputTag = clean(sideOutputTag);
--- End diff --

I think it is better to not reuse the argument variable but create a new 
one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139079
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -567,6 +600,17 @@ protected boolean isLate(W window) {
}
 
/**
+* Decide if a record is currently late, based on current watermark and 
allowed lateness.
+*
+* @param element The element to check
+* @return The element for which should be considered when sideoutputs
+*/
+   protected boolean isLate(StreamRecord element){
--- End diff --

This is not used any more, right? So it can be deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138237
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +428,26 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* @see 
org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag,
 Object) 
+*/
--- End diff --

Missing space between the ) and the {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106138062
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

Still the first comment  applies: the `equals` can be simplified given that 
`id != null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139311
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -326,33 +327,46 @@ public int getChainLength() {
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map streamOutputs,
-   List allOperators)
+   List allOperators,
+   OutputTag outputTag)
{
// create the output that the operator writes to first. this 
may recursively create more operators
Output output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, 
userCodeClassloader, streamOutputs, allOperators);
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
+
chainedOperator.setup(containingTask, operatorConfig, output);
 
allOperators.add(chainedOperator);
 
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
-   return new ChainingOutput<>(chainedOperator, this);
+   return new ChainingOutput<>(chainedOperator, this, 
outputTag);
}
else {
TypeSerializer inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, this);
+   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
}
}

private  RecordWriterOutput createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
Environment taskEnvironment,
-   String taskName)
-   {
-   TypeSerializer outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+   String taskName) {
+   OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
+
+   TypeSerializer outSerializer = null;
+
+   if (edge.getOutputTag() != null) {
+   // side output
+   outSerializer = upStreamConfig.getTypeSerializerSideOut(
+   edge.getOutputTag(), 
taskEnvironment.getUserClassLoader());
+   } else {
+   // main output
--- End diff --

this can become one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-15 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r106139449
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,18 +401,49 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are only responsible for emitting to the 
main input
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord 
record) {
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
+   // we are only responsible for emitting to the 
side-output specified by our
+   // OutputTag.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
--- End diff --

This can become `private` as the copying alternative has its own 
implementation, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441679
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I see. The problem is that if this does not work, then we can have 
important side effects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105441456
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

I would have liked to include the `TypeInformation` into the check but we 
can't do that because it's transient. I'll try and figure something out for 
checking that side outputs are unique, not as easy as it seems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436408
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105436193
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

This I'm fixing, as I mentioned above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105435806
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` 
and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to 
other outputs but instead push into the operator or into the network. For the 
other `Outputs` removing the duplication is not possible because inside the 
respective `output()` method they call `output()` of another `Output`. They 
call either with an `OutputTag` or without, so the method body is not actually 
a duplicate.

I did find another bug, though, where `CopyingBroadcastingOutputCollector` 
in `OperatorChain` was not calling the correct `collect()` method on the 
downstream `Outputs`. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398836
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -300,6 +303,36 @@ private StreamGraph 
generateInternal(List transformatio
}
 
/**
+* Transforms a {@code SideOutputTransformation}.
+*
+* 
+* For this we create a virtual node in the {@code StreamGraph} that 
holds the side-output
+* {@link org.apache.flink.util.OutputTag}.
+*
+* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+*/
+   private  Collection 
transformSideOutput(SideOutputTransformation sideOutput) {
+   StreamTransformation input = sideOutput.getInput();
+   Collection resultIds = transform(input);
+
+
+   // the recursive transform might have already transformed this
+   if (alreadyTransformed.containsKey(sideOutput)) {
+   return alreadyTransformed.get(sideOutput);
+   }
+
+   List virtualResultIds = new ArrayList<>();
+
+   for (int inputId : resultIds) {
+   int virtualId = StreamTransformation.getNewNodeId();
+   streamGraph.addVirtualSideOutputNode(inputId, 
virtualId, sideOutput.getOutputTag());
+   virtualResultIds.add(virtualId);
+   }
+   return virtualResultIds;
+   }
+
+
--- End diff --

Leave only one empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402746
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1528,14 +1572,16 @@ public void 
testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105409533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ---
@@ -72,6 +76,11 @@ public RecordWriterOutput(
 
--- End diff --

We can't throw, because that would crash the program. This is a good catch, 
though! I will remove the duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401808
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -441,26 +491,55 @@ public void close() {
private static final class CopyingChainingOutput extends 
ChainingOutput {

private final TypeSerializer serializer;
-   
+
public CopyingChainingOutput(
OneInputStreamOperator operator,
TypeSerializer serializer,
+   OutputTag outputTag,
StreamStatusProvider streamStatusProvider) {
-   super(operator, streamStatusProvider);
+   super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
 
--- End diff --

Again the two `collect()` have duplicate code (after the casting).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397306
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
--- End diff --

We do not need both lines with the checks. We can just have:

`this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105403355
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 ---
@@ -40,6 +41,12 @@ public void collect(StreamRecord record) {
}
 
@Override
+   public  void collect(
--- End diff --

The signature fits in one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105399457
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This transformation represents a selection of a side output of an 
upstream operation with a
+ * given {@link OutputTag}.
+ *
+ * This does not create a physical operation, it only affects how 
upstream operations are
+ * connected to downstream operations.
+ *
+ * @param  The type of the elements that result from this {@code 
SideOutputTransformation}
+ */
+public class SideOutputTransformation extends StreamTransformation {
+   private final StreamTransformation input;
--- End diff --

Leave a blank line here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397096
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java ---
@@ -46,7 +46,15 @@
public TypeHint() {
this.typeInfo = TypeExtractor.createTypeInfo(this, 
TypeHint.class, getClass(), 0);
}
-   
+
+   /**
+* Creates a hint for the generic type in the class signature.
+*/
+   public TypeHint(Class baseClass, Object instance, int 
genericParameterPos) {
+   this.typeInfo = TypeExtractor.createTypeInfo(instance, 
baseClass, instance.getClass(), genericParameterPos);
+   }
+
+
--- End diff --

Remove one of the 2 empty lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397933
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
+   Preconditions.checkNotNull(typeInfo, 
"TypeInformation cannot be null.");
+   }
+
+   private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+   in.defaultReadObject();
+   typeInfo = null;
+   }
+
+   public String getId() {
+   return id;
+   }
+
+   public TypeInformation getTypeInfo() {
+   return typeInfo;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof OutputTag
--- End diff --

Two points:
1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` 
because we check at the constructor, so this method can be simplified.
2) we never check for uniqueness of the `outputTag.id`. We should do it at 
the translation. This is also a correctness issue as this may result in 
undesired sideoutput "collisions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -387,14 +403,25 @@ public int getChainLength() {
 
protected final StreamStatusProvider streamStatusProvider;
 
-   public ChainingOutput(OneInputStreamOperator operator, 
StreamStatusProvider streamStatusProvider) {
+   protected final OutputTag outputTag;
+
+   public ChainingOutput(
+   OneInputStreamOperator operator,
+   StreamStatusProvider streamStatusProvider,
+   OutputTag outputTag) {
this.operator = operator;
this.numRecordsIn = ((OperatorMetricGroup) 
operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
}
 
@Override
--- End diff --

Again the two `collect()` methods have much identical code. We can put that 
common code in a separate private method and calls this instead of repeating 
the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402262
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
+   @Override
+   public  void collect(
+   OutputTag outputTag, StreamRecord record) 
{
+   for (int i = 0; i < outputs.length - 1; i++) {
+   Output output = outputs[i];
+
+   // due to side outputs, StreamRecords of 
varying types can pass through the broadcasting
+   // collector so we need to cast
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   StreamRecord shallowCopy = (StreamRecord) 
record.copy(record.getValue());
+   output.collect(shallowCopy);
+   }
+
--- End diff --

Duplicate comment with the one inside the loop. Also the "// don't copy for 
the last output" is not right because we actually create a copy before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105403240
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
 ---
@@ -53,5 +54,11 @@ public void collect(StreamRecord record) {
}
 
@Override
--- End diff --

The signature can go on the same line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398702
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = 
virtualSideOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   outputTag = 
virtualSideOutputNodes.get(virtualId).f1;
+   }
+   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, null, outputTag);
+   } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override 
earlier selections
outputNames = 
virtualSelectNodes.get(virtualId).f1;
}
-   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames);
+   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames, outputTag);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) 
{
int virtualId = upStreamVertexID;
upStreamVertexID = 
virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = 
virtualPartitionNodes.get(virtualId).f1;
}
-   addEdgeInternal(upStreamVertexID, downStreamVertexID, 
typeNumber, partitioner, outputNames);
+
--- End diff --

Remove line for uniformity with the `if() ...` above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105400890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -326,33 +327,48 @@ public int getChainLength() {
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map streamOutputs,
-   List allOperators)
+   List allOperators,
+   OutputTag outputTag)
{
// create the output that the operator writes to first. this 
may recursively create more operators
Output output = createOutputCollector(
containingTask, operatorConfig, chainedConfigs, 
userCodeClassloader, streamOutputs, allOperators);
 
// now create the operator and give it the output collector to 
write its output to
OneInputStreamOperator chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
+
chainedOperator.setup(containingTask, operatorConfig, output);
 
allOperators.add(chainedOperator);
 
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
-   return new ChainingOutput<>(chainedOperator, this);
+   return new ChainingOutput<>(chainedOperator, this, 
outputTag);
}
else {
TypeSerializer inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, this);
+   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
}
}

private  RecordWriterOutput createStreamOutput(
StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
Environment taskEnvironment,
-   String taskName)
-   {
-   TypeSerializer outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+   String taskName) {
+   OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
+
+   TypeSerializer outSerializer = null;
+
--- End diff --

too much line breaking in the following `if() else()` . We could easily 
reduce it or even replace it with:
```
TypeSerializer outSerializer = (edge.getOutputTag() != null) ? 
upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), 
taskEnvironment.getUserClassLoader()) :

upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402830
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1618,14 +1664,16 @@ public void 
testDropDueToLatenessSessionZeroLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
-   LATENESS);
+   LATENESS,
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402616
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1267,14 +1291,16 @@ public void testLateness() throws Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397483
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
+   TypeHint typeHint =
+   new TypeHint(OutputTag.class, this, 
0) {};
+   this.typeInfo = typeHint.getTypeInfo();
+   } catch (InvalidTypesException e) {
+   throw new InvalidTypesException("Could not determine 
TypeInformation for generic " +
+   "OutputTag type. Did you forget to make 
your OutputTag an anonymous inner class?", e);
+   }
+   }
+
+   /**
+* Creates a new named {@code OutputTag} with the given id and output 
{@link TypeInformation}.
+*
+* @param id The id of the created {@code OutputTag}.
+* @param typeInfo The {@code TypeInformation} for the side output.
+*/
+   public OutputTag(String id, TypeInformation typeInfo) {
+   this.id = Preconditions.checkNotNull(id, "OutputTag id cannot 
be null.");
+   this.typeInfo =
--- End diff --

No need for line breaking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402899
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1702,15 +1754,16 @@ public void 
testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce
stateDesc,
new InternalSingleValueWindowFunction<>(new 
ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
-   LATENESS);
+   LATENESS,
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105401947
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -539,5 +625,26 @@ public void collect(StreamRecord record) {
// don't copy for the last output
outputs[outputs.length - 1].collect(record);
}
+
--- End diff --

The arguments can go in the same line as the method name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105398138
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
 ---
@@ -17,13 +17,19 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
--- End diff --

Unused imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105402678
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 ---
@@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws 
Exception {
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
EventTimeTrigger.create(),
-   LATENESS);
+   LATENESS,
+   lateOutputTag);
--- End diff --

wrong alignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105397424
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+
+/**
+ * An {@link OutputTag} is a typed and named tag to use for tagging side 
outputs
+ * of an operator.
+ *
+ * An {@code OutputTag} must always be an anonymous inner class so that 
Flink can derive
+ * a {@link TypeInformation} for the generic type parameter.
+ *
+ * Example:
+ * {@code
+ * OutputTag> info = new OutputTag>("late-data"){});
+ * }
+ *
+ * @param  the type of elements in the side-output stream.
+ */
+@PublicEvolving
+public class OutputTag implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String id;
+
+   private transient TypeInformation typeInfo;
+
+   /**
+* Creates a new named {@code OutputTag} with the given id.
+*
+* @param id The id of the created {@code OutputTag}.
+ */
+   public OutputTag(String id) {
+   Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
+   this.id = requireNonNull(id);
+
+   try {
--- End diff --

No need for line breaking:
`TypeHint typeHint = new TypeHint(OutputTag.class, this, 0) {};`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105372676
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I just added a test for the behaviour with a "weird" `WindowAssigner`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105368360
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

I thought about this again. I think it doesn't hurt to have it because it 
catches the case when a `WindowAssigner` doesn't assign any windows. In that 
case an element is also "skipped" but it is not necessarily considered late. 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-09 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105235710
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

Thanks @kl0u Good catch! 

I put `isLate` there with intention to filter out `dropped events with 
other reasons` which I may not aware of.  lateArrivingEvents is really `late 
arriving` and `dropped` events.

@aljoscha If that is redundant check, we might just remove `isLate`. 
What do you think?
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r105225561
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -419,6 +435,14 @@ public void merge(W mergeResult,
registerCleanupTimer(window);
}
}
+
+   // side output input event if
+   // element not handled by any window
+   // late arriving tag has been set
+   // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
+   if(isSkippedElement && lateDataOutputTag != null && 
isLate(element)) {
--- End diff --

@chenqin and @aljoscha I am starting to review the PR and I was wondering 
when is this new `isLate()` check needed? At least for the out-of-box window 
assigners, this seems to be a redundant check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104997566
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

That sounds right, good catch! 
Thanks for fixing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104996349
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104995971
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

sounds good to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104895136
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

I think the comment is a leftover from copying this code from split/select. 
For side outputs it can't happen that you have multiple "selects" after one 
another. Will remove the comment. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104880615
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

The method is already called `addVirtualSideOutputNode()`. I'm adjusting 
the name of the field. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104881269
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

You're right, I'm changing this to simply have two loops.

I think you introduced this in the first place, though. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104880412
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +418,35 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* Example:
+* {@code
+* static final OutputTag sideOutputTag = new 
OutputTag("side-output") {};
+*
+* public void flatMap(X value, Collector out) throws Exception 
{
--- End diff --

Fixing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104846393
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -416,4 +418,35 @@ private boolean canBeParallel() {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
+
+   /**
+* Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+* into the side output with the given {@link OutputTag}.
+*
+* Example:
+* {@code
+* static final OutputTag sideOutputTag = new 
OutputTag("side-output") {};
+*
+* public void flatMap(X value, Collector out) throws Exception 
{
--- End diff --

Comments seems out of date, I think we already decided to get ride of 
CollectorWrapper


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer 
downStreamVertexID, int ty
downStreamVertexID,
typeNumber,
null,
-   new ArrayList());
+   new ArrayList(), null);
 
}
 
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner partitioner,
-   List outputNames) {
-
+   List outputNames,
+   OutputTag outputTag) {
 
-   if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+   if (virtualOutputNodes.containsKey(upStreamVertexID)) {
+   int virtualId = upStreamVertexID;
+   upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
+   if (outputTag == null) {
+   // selections that happen downstream override 
earlier selections
--- End diff --

may consider call out this behavior in `getSideOutput` comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847832
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, 
StreamGraph streamGraph) {
headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof 
ForwardPartitioner)
&& upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
+   && edge.getOutputTag() == null // disable 
chaining for side outputs
--- End diff --

I remember you mentioned latest version side output works with chain or no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847733
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -60,6 +60,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
--- End diff --

Do you think introduce this dependency is good idea or bad idea? Up to you 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread chenqin
Github user chenqin commented on a diff in the pull request:

https://github.com/apache/flink/pull/3484#discussion_r104847005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -85,6 +86,7 @@
private Set sources;
private Set sinks;
private Map> virtualSelectNodes;
+   private Map> virtualOutputNodes;
--- End diff --

We might consider use  `addVirtualSideOutputNode`  and 
`virtualSideOutputNodes`. Unless we want to refactor move away from current 
assumption `operator` to `<... operator 
<...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink

2017-03-07 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/3484

[FLINK-4460] Side Outputs in Flink

This is a refinement of #2982 by @chenqin.

I changed the API a bit, added support for side outputs to 
`ProcessFunction`, enabled side outputs to work with chaining, added proper 
Scala API and a Scala API test and added documentation.

R: @uce @kl0u and @chenqin for review, please

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3484


commit 1746c7e5981942dfb0e57954f8241a400b699120
Author: Chen Qin 
Date:   2016-10-21T19:38:04Z

[FLINK-4460] Add support for side outputs

This does not yet allow users to emit to side outputs in user functions.
Only operators (StreamOperator) can emit to side outputs. A side output
can be retrieved on a SingleOutputStreamOperator.

commit 362fcb38abf525e704e97505186923ae77dc167b
Author: Aljoscha Krettek 
Date:   2016-10-21T19:38:04Z

[FLINK-4460] Add side outputs for ProcessFunction

commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50
Author: Aljoscha Krettek 
Date:   2017-02-16T13:09:26Z

[FLINK-4460] Make chaining work with side outputs

commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40
Author: Aljoscha Krettek 
Date:   2017-02-16T13:41:10Z

[FLINK-4460] Expose OutputTag constructor that takes TypeInformation

commit 935ff90dadad380db293265cc49d072d764cf510
Author: Chen Qin 
Date:   2017-03-01T14:36:17Z

[FLINK-4460] Provide late-data output for window operations

We use side outputs to emit dropped late data.

commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd
Author: Aljoscha Krettek 
Date:   2017-03-07T10:06:31Z

[FLINK-4460] Add proper side output API for Scala API

The Scala side output API uses context bounds to get a TypeInformation
for an OutputTag. This also adds a SideOutputITCase for the Scala API.

commit 66182b79931016a04286279f5aba63af30442b02
Author: Aljoscha Krettek 
Date:   2017-03-07T11:06:01Z

[FLINK-4460] Add documentation for side outputs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---