[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932258#comment-15932258 ] ASF GitHub Bot commented on FLINK-4460: --- Github user jgrier commented on the issue: https://github.com/apache/flink/pull/2982 Nice :) > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > Fix For: 1.3.0 > > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931107#comment-15931107 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin closed the pull request at: https://github.com/apache/flink/pull/2982 > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > Fix For: 1.3.0 > > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931106#comment-15931106 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 Wow, finally in! It was fun and thanks for you help along every step of the journey! Chen > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > Fix For: 1.3.0 > > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931104#comment-15931104 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I finally merged it. Could you please also close this PR? And thanks again for working on this for so long! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931103#comment-15931103 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3484 > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930241#comment-15930241 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge. @chenqin A lot of thanks also to you for working on this and pushing it with me! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930220#comment-15930220 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106685158 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930214#comment-15930214 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- That is what I remember as well. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15930204#comment-15930204 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684096 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926168#comment-15926168 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163521 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java --- @@ -63,6 +66,7 @@ private TypeSerializer typeSerializerIn1; private TypeSerializer typeSerializerIn2; private TypeSerializer typeSerializerOut; + private MaptypeSerializerMap; --- End diff -- This is not used anywhere in the code. Can it be removed, along with the `getTypeSerializerOut()` and `setTypeSerializerOut()`? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926171#comment-15926171 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162497 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.sideoutput; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * An example that illustrates the use of side outputs. + * + * This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount} + * that has a filter in the tokenizer and only emits some words for counting + * while emitting the other words to a side output. + */ +public class SideOutputExample { + + /** +* We need to create an {@link OutputTag} so that we can reference it when emitting +* data to a side output and also to retrieve the side output stream from an operation. +*/ + static final OutputTag rejectedWordsTag = new OutputTag("rejected") {}; --- End diff -- Here we add a side output but we do nothing to show that it works. Probably we can add a prefix "rejected-" to the record and print it, so that the user can see what the side output does. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926170#comment-15926170 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163045 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); --- End diff -- The `null` should go to the next line for uniformity. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926169#comment-15926169 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162749 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Does it make sense to add the outputTag also in the `edgeId`? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925916#comment-15925916 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138342 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ + public DataStream getSideOutput(OutputTag sideOutputTag){ + sideOutputTag = clean(sideOutputTag); --- End diff -- I think it is better to not reuse the argument variable but create a new one. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925923#comment-15925923 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139311 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -326,33 +327,46 @@ public int getChainLength() { MapchainedConfigs, ClassLoader userCodeClassloader, Map streamOutputs, - List allOperators) + List allOperators, + OutputTag outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - String taskName) - { - TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + + if (edge.getOutputTag() != null) { + // side output + outSerializer = upStreamConfig.getTypeSerializerSideOut( + edge.getOutputTag(), taskEnvironment.getUserClassLoader()); + } else { + // main output --- End diff -- this can become one line. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925918#comment-15925918 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138237 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ --- End diff -- Missing space between the ) and the { > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925917#comment-15925917 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138473 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ + public DataStream getSideOutput(OutputTag sideOutputTag){ + sideOutputTag = clean(sideOutputTag); + + TypeInformation type = requestedSideOutputs.get(sideOutputTag); + if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { + throw new UnsupportedOperationException("A side output with a matching id was " + + "already requested with a different type. This is not allowed, side output " + + "ids need to be unique."); + } + + requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); + --- End diff -- The `requireNotNull` should be in the beginning of the method. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925925#comment-15925925 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138895 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag}. + * + * This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param The type of the elements that result from this {@code SideOutputTransformation} + */ --- End diff -- Here we do not check if the `input` is `null` (we do it in the caller method only) but we try get the parallelism. We could have the parallelism as a separate argument, and then, after the `super()` check if the input is null. This makes the code of the class self-contained as you do not have to check other classes to see if the `input` can be `null` or not. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925924#comment-15925924 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138062 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- Still the first comment applies: the `equals` can be simplified given that `id != null`. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925919#comment-15925919 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139079 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- This is not used any more, right? So it can be deleted. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925921#comment-15925921 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139593 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override + protected void pushToOperator(StreamRecord record) { try { + // we know that the given outputTag matches our OutputTag so the record + // must be of the type that our operator (and Serializer) expects. + @SuppressWarnings("unchecked") + StreamRecord castRecord = (StreamRecord) record; + numRecordsIn.inc(); - StreamRecord copy = record.copy(serializer.copy(record.getValue())); + StreamRecord copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); - operator.processElement(copy); - } - catch (Exception e) { + operator.processElement(castRecord); --- End diff -- This should be `copy`, not `castRecord`. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925922#comment-15925922 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139449 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,18 +401,49 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + --- End diff -- This can become `private` as the copying alternative has its own implementation, right? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925920#comment-15925920 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139530 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override --- End diff -- This can become `private`, as before. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906129#comment-15906129 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3484 Thanks @aljoscha I will have a look on Monday. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906128#comment-15906128 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 @kl0u @chenqin I cleaned up the commits, distributed the fixes from the comments to the right commits. I also added more tests/ITCases for: detecting name clashes in side output IDs, side outputs with multiple consumers. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905414#comment-15905414 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441679 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- I see. The problem is that if this does not work, then we can have important side effects. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905412#comment-15905412 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441456 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- I would have liked to include the `TypeInformation` into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905390#comment-15905390 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks @kl0u for the (already) quite thorough review! I'll push a commit with fixes. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905381#comment-15905381 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436408 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + --- End diff -- fixed. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905380#comment-15905380 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436193 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override --- End diff -- This I'm fixing, as I mentioned above. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905378#comment-15905378 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105435806 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java --- @@ -72,6 +76,11 @@ public RecordWriterOutput( --- End diff -- I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to other outputs but instead push into the operator or into the network. For the other `Outputs` removing the duplication is not possible because inside the respective `output()` method they call `output()` of another `Output`. They call either with an `OutputTag` or without, so the method body is not actually a duplicate. I did find another bug, though, where `CopyingBroadcastingOutputCollector` in `OperatorChain` was not calling the correct `collect()` method on the downstream `Outputs`. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905199#comment-15905199 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105409533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java --- @@ -72,6 +76,11 @@ public RecordWriterOutput( --- End diff -- We can't throw, because that would crash the program. This is a good catch, though! I will remove the duplication. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905145#comment-15905145 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402830 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1618,14 +1664,16 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS); + LATENESS, --- End diff -- wrong alignment > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905139#comment-15905139 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401808 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +491,55 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } --- End diff -- Again the two `collect()` have duplicate code (after the casting). > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905151#comment-15905151 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105400890 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -326,33 +327,48 @@ public int getChainLength() { MapchainedConfigs, ClassLoader userCodeClassloader, Map streamOutputs, - List allOperators) + List allOperators, + OutputTag outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - String taskName) - { - TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + --- End diff -- too much line breaking in the following `if() else()` . We could easily reduce it or even replace it with: ``` TypeSerializer outSerializer = (edge.getOutputTag() != null) ? upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); ``` > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905149#comment-15905149 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402899 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1702,15 +1754,16 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, --- End diff -- wrong alignment > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905137#comment-15905137 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397306 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); --- End diff -- We do not need both lines with the checks. We can just have: `this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");` > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905144#comment-15905144 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403355 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java --- @@ -40,6 +41,12 @@ public void collect(StreamRecord record) { } @Override + public void collect( --- End diff -- The signature fits in one line > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905150#comment-15905150 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397483 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = --- End diff -- No need for line breaking. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905138#comment-15905138 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398836 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -300,6 +303,36 @@ private StreamGraph generateInternal(Listtransformatio } /** +* Transforms a {@code SideOutputTransformation}. +* +* +* For this we create a virtual node in the {@code StreamGraph} that holds the side-output +* {@link org.apache.flink.util.OutputTag}. +* +* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator +*/ + private Collection transformSideOutput(SideOutputTransformation sideOutput) { + StreamTransformation input = sideOutput.getInput(); + Collection resultIds = transform(input); + + + // the recursive transform might have already transformed this + if (alreadyTransformed.containsKey(sideOutput)) { + return alreadyTransformed.get(sideOutput); + } + + List virtualResultIds = new ArrayList<>(); + + for (int inputId : resultIds) { + int virtualId = StreamTransformation.getNewNodeId(); + streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag()); + virtualResultIds.add(virtualId); + } + return virtualResultIds; + } + + --- End diff -- Leave only one empty line. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905143#comment-15905143 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402616 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1267,14 +1291,16 @@ public void testLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905142#comment-15905142 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401503 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override --- End diff -- Again the two `collect()` methods have much identical code. We can put that common code in a separate private method and calls this instead of repeating the code. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905147#comment-15905147 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402678 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create(), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905153#comment-15905153 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403240 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java --- @@ -53,5 +54,11 @@ public void collect(StreamRecord record) { } @Override --- End diff -- The signature can go on the same line > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905146#comment-15905146 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398702 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; + if (outputTag == null) { + outputTag = virtualSideOutputNodes.get(virtualId).f1; + } + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag); + } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } - addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } - addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + --- End diff -- Remove line for uniformity with the `if() ...` above. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905155#comment-15905155 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397933 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- Two points: 1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` because we check at the constructor, so this method can be simplified. 2) we never check for uniqueness of the `outputTag.id`. We should do it at the translation. This is also a correctness issue as this may result in undesired sideoutput "collisions. > Side Outputs in Flink > - > > Key: FLINK-4460 >
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905148#comment-15905148 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402262 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + + @Override + public void collect( + OutputTag outputTag, StreamRecord record) { + for (int i = 0; i < outputs.length - 1; i++) { + Outputoutput = outputs[i]; + + // due to side outputs, StreamRecords of varying types can pass through the broadcasting + // collector so we need to cast + @SuppressWarnings({"unchecked", "rawtypes"}) + StreamRecord shallowCopy = (StreamRecord) record.copy(record.getValue()); + output.collect(shallowCopy); + } + --- End diff -- Duplicate comment with the one inside the loop. Also the "// don't copy for the last output" is not right because we actually create a copy before. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905140#comment-15905140 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402746 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1528,14 +1572,16 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905135#comment-15905135 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397424 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { --- End diff -- No need for line breaking: `TypeHint typeHint = new TypeHint(OutputTag.class, this, 0) {};` > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905134#comment-15905134 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397096 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java --- @@ -46,7 +46,15 @@ public TypeHint() { this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); } - + + /** +* Creates a hint for the generic type in the class signature. +*/ + public TypeHint(Class baseClass, Object instance, int genericParameterPos) { + this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos); + } + + --- End diff -- Remove one of the 2 empty lines. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905154#comment-15905154 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401947 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + --- End diff -- The arguments can go in the same line as the method name. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905136#comment-15905136 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105399939 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java --- @@ -72,6 +76,11 @@ public RecordWriterOutput( --- End diff -- The bodies of the following two `collect(...)` methods are identical modulo the check for the output tag. It makes sense to put the common code in the same `private` method and call that method from each one of these (after doing the necessary checks). As a side comment, should we just `return` if the wrong method is called or throw an exception ? This "Duplicate code" pattern appears some times in the code. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905141#comment-15905141 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105399457 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag}. + * + * This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param The type of the elements that result from this {@code SideOutputTransformation} + */ +public class SideOutputTransformation extends StreamTransformation { + private final StreamTransformation input; --- End diff -- Leave a blank line here. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905152#comment-15905152 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398138 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java --- @@ -17,13 +17,19 @@ package org.apache.flink.streaming.examples.windowing; --- End diff -- Unused imports. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904878#comment-15904878 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105372676 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- I just added a test for the behaviour with a "weird" `WindowAssigner`. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904842#comment-15904842 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105368360 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- I thought about this again. I think it doesn't hurt to have it because it catches the case when a `WindowAssigner` doesn't assign any windows. In that case an element is also "skipped" but it is not necessarily considered late. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903561#comment-15903561 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105235710 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- Thanks @kl0u Good catch! I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events. @aljoscha If that is redundant check, we might just remove `isLate`. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903460#comment-15903460 ] ASF GitHub Bot commented on FLINK-4460: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105225561 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902850#comment-15902850 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3484 Unfortunately, I won't have time to look over this PR this week. Thanks for pinging me @aljoscha @chenqin. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901828#comment-15901828 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104997566 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- That sounds right, good catch! Thanks for fixing! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901801#comment-15901801 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104996349 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- sounds good > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901795#comment-15901795 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104995971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- sounds good to me! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901094#comment-15901094 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104895136 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- I think the comment is a leftover from copying this code from split/select. For side outputs it can't happen that you have multiple "selects" after one another. Will remove the comment. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900996#comment-15900996 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104881269 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- You're right, I'm changing this to simply have two loops. I think you introduced this in the first place, though. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900991#comment-15900991 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880615 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- The method is already called `addVirtualSideOutputNode()`. I'm adjusting the name of the field. Thanks! > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900988#comment-15900988 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880412 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* Example: +* {@code +* static final OutputTag sideOutputTag = new OutputTag("side-output") {}; +* +* public void flatMap(X value, Collector out) throws Exception { --- End diff -- Fixing > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900753#comment-15900753 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847832 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() + && edge.getOutputTag() == null // disable chaining for side outputs --- End diff -- I remember you mentioned latest version side output works with chain or no? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900754#comment-15900754 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847733 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- Do you think introduce this dependency is good idea or bad idea? Up to you :) > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900757#comment-15900757 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847005 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- We might consider use `addVirtualSideOutputNode` and `virtualSideOutputNodes`. Unless we want to refactor move away from current assumption `operator` to `< ... operator < ... ` > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900755#comment-15900755 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847407 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- may consider call out this behavior in `getSideOutput` comments > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900756#comment-15900756 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104846393 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* Example: +* {@code +* static final OutputTag sideOutputTag = new OutputTag("side-output") {}; +* +* public void flatMap(X value, Collector out) throws Exception { --- End diff -- Comments seems out of date, I think we already decided to get ride of CollectorWrapper > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899255#comment-15899255 ] ASF GitHub Bot commented on FLINK-4460: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3484 [FLINK-4460] Side Outputs in Flink This is a refinement of #2982 by @chenqin. I changed the API a bit, added support for side outputs to `ProcessFunction`, enabled side outputs to work with chaining, added proper Scala API and a Scala API test and added documentation. R: @uce @kl0u and @chenqin for review, please You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3484 commit 1746c7e5981942dfb0e57954f8241a400b699120 Author: Chen QinDate: 2016-10-21T19:38:04Z [FLINK-4460] Add support for side outputs This does not yet allow users to emit to side outputs in user functions. Only operators (StreamOperator) can emit to side outputs. A side output can be retrieved on a SingleOutputStreamOperator. commit 362fcb38abf525e704e97505186923ae77dc167b Author: Aljoscha Krettek Date: 2016-10-21T19:38:04Z [FLINK-4460] Add side outputs for ProcessFunction commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50 Author: Aljoscha Krettek Date: 2017-02-16T13:09:26Z [FLINK-4460] Make chaining work with side outputs commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40 Author: Aljoscha Krettek Date: 2017-02-16T13:41:10Z [FLINK-4460] Expose OutputTag constructor that takes TypeInformation commit 935ff90dadad380db293265cc49d072d764cf510 Author: Chen Qin Date: 2017-03-01T14:36:17Z [FLINK-4460] Provide late-data output for window operations We use side outputs to emit dropped late data. commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd Author: Aljoscha Krettek Date: 2017-03-07T10:06:31Z [FLINK-4460] Add proper side output API for Scala API The Scala side output API uses context bounds to get a TypeInformation for an OutputTag. This also adds a SideOutputITCase for the Scala API. commit 66182b79931016a04286279f5aba63af30442b02 Author: Aljoscha Krettek Date: 2017-03-07T11:06:01Z [FLINK-4460] Add documentation for side outputs > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897560#comment-15897560 ] ASF GitHub Bot commented on FLINK-4460: --- Github user wenlong88 commented on the issue: https://github.com/apache/flink/pull/3438 thanks for explaination, I have such concern because we have just suggested our users to use processFunction to implement their jobs, they need to change their code too when we sync the cimmit.after all, it is really nice to have timer in more scenarios. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897539#comment-15897539 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 Merged > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897534#comment-15897534 ] ASF GitHub Bot commented on FLINK-4460: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3438 > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897532#comment-15897532 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104446345 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. * * @param Type of the input elements. * @param Type of the output elements. */ @PublicEvolving -public interface ProcessFunction extends Function { +public abstract class ProcessFunction extends AbstractRichFunction { --- End diff -- hi @wenlong88 in the ML discussion (https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E) we decided to make `ProcessFunction` available on non-keyed streams as well to allow using side outputs there. This requires making the `onTimer()` method abstract, otherwise every user would always have to implement it. We marked `ProcessFunction` as `@PublicEvolcing` just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897185#comment-15897185 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104399614 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(ProcessFunctionprocessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* @param outputType {@link TypeInformation} for the result type of the function. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @Internal + public SingleOutputStreamOperator process( --- End diff -- Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. So +1 to keep it as is. I was just wondering whether users would be confused by this. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897086#comment-15897086 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @uce There is some documentation that says that `ProcessFunction` is only available on keyed streams. I'll change that. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897065#comment-15897065 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @uce There is not issue for 2.0 to track this because I don't think there is consensus about always having timestamps. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897064#comment-15897064 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104384206 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and --- End diff -- fixing > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897060#comment-15897060 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104383800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(ProcessFunctionprocessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* @param outputType {@link TypeInformation} for the result type of the function. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @Internal + public SingleOutputStreamOperator process( --- End diff -- Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a `TypeInformation` because we get the `TypeInformation` from the context bound in the Scala API. Calling `transform()` manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in `(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897031#comment-15897031 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3438 @rmetzger @aljoscha I would agree with Aljoscha that your point is independent of this PR. Is there an issue for 2.0 to track this? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897029#comment-15897029 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104377167 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(ProcessFunctionprocessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** +* Applies the given {@link ProcessFunction} on the input stream, thereby +* creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param processFunction The {@link ProcessFunction} that is called for each element +* in the stream. +* @param outputType {@link TypeInformation} for the result type of the function. +* +* @param The type of elements emitted by the {@code ProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @Internal + public SingleOutputStreamOperator process( --- End diff -- Is this internal method only exposed as `public` for the Scala API? If yes, I'm wondering if it makes sense to call `transform` manually in the Scala `DataStream` API. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897028#comment-15897028 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104378368 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. * * @param Type of the input elements. * @param Type of the output elements. */ @PublicEvolving -public interface ProcessFunction extends Function { +public abstract class ProcessFunction extends AbstractRichFunction { --- End diff -- Missing serialVersionUID > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897027#comment-15897027 ] ASF GitHub Bot commented on FLINK-4460: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104377392 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and --- End diff -- typo: as => is? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1589#comment-1589 ] ASF GitHub Bot commented on FLINK-4460: --- Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104334699 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java --- @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** * A function that processes elements of a stream. * - * The function will be called for every element in the input stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context}. - * When reacting to the firing of set timers the function can directly emit a result, and/or - * register a timer that will trigger an action in the future. + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. * * @param Type of the input elements. * @param Type of the output elements. */ @PublicEvolving -public interface ProcessFunction extends Function { +public abstract class ProcessFunction extends AbstractRichFunction { --- End diff -- hi, changing form `interface` to `class` is incompatible on the user side. Can't ProcessFunction just extend RichFunction? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895742#comment-15895742 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 I think the discussion of timestamps and additional interfaces is orthogonal to this PR: `KeyedProcessOperator` is a renaming of the pre-existing `ProcessOperator` and the new `ProcessOperator` is a simplification that does away with timers. The interface for timestamps exists in the current code base, if we want to change that we should open other Jira issues. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895737#comment-15895737 ] ASF GitHub Bot commented on FLINK-4460: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3438 In addition to throwing an exception, we should also expose `element.hasTimestamp()` to offer our users a clean way of checking for timestamps. Lets see what @uce or other reviewers think about this. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895553#comment-15895553 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @rmetzger Yes, it's unfortunate that in our model not all elements always have a timestamp. The other alternative is throwing an exception when trying to access a non-existing timestamp. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15894999#comment-15894999 ] ASF GitHub Bot commented on FLINK-4460: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3438 I looked over the changes and didn't find anything critical. The only thing that made me thinking was the boxed `Long` type for the timestamp(). I assume you decided for this approach to signal timestamp unavailability using `null`. The Java documentation does not recommend to rely on autoboxing for performance critical code: http://docs.oracle.com/javase/1.5.0/docs/guide/language/autoboxing.html Tests, Scala API were done. I assume that we don't need to explicitly mention support for the process function on non-keyed streams. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891296#comment-15891296 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Nice! Let's do "Chen Qin qinnc...@gmail.com" > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890301#comment-15890301 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin Would you like your commits to be attributed to "Chen Qin" or "Chen Qin "? I see both in your set of commits? I'm finally putting everything together and hopefully merging soon since the mailing list discussion seems to favour our approach. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890141#comment-15890141 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 R: @uce or @rmetzger for review, please > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890140#comment-15890140 ] ASF GitHub Bot commented on FLINK-4460: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3438 [FLINK-4460] Allow ProcessFunction on non-keyed streams This is in preparation for side outputs, which will only work on `ProcessFunction`. We still want side outputs on non-keyed streams so we have to make `ProcessFunction` available there. See this ML thread for reference: https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-4460-process-for-everyone Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3438.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3438 commit 38f33f2399598b521a7e34e8dea1d236f5672042 Author: Aljoscha KrettekDate: 2017-03-01T09:57:12Z [FLINK-4460] Make ProcessFunction abstract, add default onTime() method This is in preparation of allowing ProcessFunction on DataStream because we will use it to allow side outputs from the ProcessFunction Context. commit f0dd2c0d81a847cfa4f3d241ce874db6807caee2 Author: Aljoscha Krettek Date: 2017-03-01T10:41:02Z [FLINK-4660] Allow ProcessFunction on DataStream Introduce new ProcessOperator for this. Rename the pre-existing ProcessOperator to KeyedProcessOperator. commit 29ca9b4b794522b35d84d9f19edc5b0bb9f64912 Author: Aljoscha Krettek Date: 2017-03-01T10:33:03Z [FLINK-4460] Make CoProcessFunction abstract, add default onTime() method This is in preparation of allowing CoProcessFunction on a non-keyed connected stream. we will use it to allow side outputs from the ProcessFunction Context. commit a26accf8feebaf9d3566055c0d1eb3006986bfd6 Author: Aljoscha Krettek Date: 2017-03-01T11:02:34Z [FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams Introduce new CoProcessOperator for this. Rename the pre-existing CoProcessOperator to KeyedCoProcessOperator. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880467#comment-15880467 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that. Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output. > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872353#comment-15872353 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Looks good to me I briefly looked at your git branch, a minor comment would be adding comments to `sideOutputLateData` so user get better idea when they opt-in to late arriving event stream. Initial late arriving event is decided by comparing watermark & eventTime, do you think there is a need to allow user pass a kinda `Evaluator` and enable user sideOutput any kind of sideOutputs? `window.sideOutput(OutputTag, Evaluator)` `interface Evaluator{ MergedWindows, key, watermark}` - Regarding `split` `select`, I think there is a chance of consolidate select and build upon `OutputTag`, but might be out of this PR's scope. - Regarding to `WindowStream`, I am a bit confused to figure out if I use `allowedlateness` and `sideOutputLateData` at same time. Thanks, Chen > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872032#comment-15872032 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I rebased and consolidated your PR a bit and played around with the APIs. Some of the changes: - Separation into internal changes/window operator changes and user-function changes. I have a prototype commit that exposes side outputs using `ProcessFunction`. - I removed `CollectorWrapper`/`RichCollector` in favour of the `ProcessFunction` approach - The internal implementation now doesn't store the `OutputTag` on the `StreamRecord` but instead adds an additional method on `Output` that should be used for emitting data to a side output. Side outputs now also work with chaining. - `WindowedStream` is changed to add a `sideOutputLateData()` method that is used to specify that late data side output is required. This is more general than putting it into the method signature of `apply()` because it will simply work for all different window types. I quickly talked to @StephanEwen and we agreed that we need to further think about how we want to expose side outputs for user-defined functions. Especially, we have to think about what this addition means for the `split()`/`select()` pattern. I will also do another change where `sideOutputLateData()` is not required but instead will be added when the late-date stream is requested. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868176#comment-15868176 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha, diff has been updated and merged to support `apply(Function, lateElementsOutput);` `Collector` refactor seems not viable option as stated above. Seems travis-ci timeouted here, is there a way to increase timeout setting? https://travis-ci.org/chenqin/flink/builds/201329246 Chen > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15746680#comment-15746680 ] ASF GitHub Bot commented on FLINK-4460: --- Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Thanks for your time. We can chat more after 1.2 release! I think it makes sense to extends Collector, even though we may not remove collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in [FLIP-13 email thread](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html) Only point I would like to add: there seems decent amount of refactor to replace underlining output chains using collect(tag, element), yet seems reasonable investment moving forward (multiple inputs/ multiple outputs) `tooLateEvents()` method is something added for user's convenience. should be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same type as input (which is like already fixed once input type defined). Add late arriving tag within apply method seems redudant. In fact, without any changes to this diff, user also be able to access late arriving events via following way. ` OutputTag lateElementsOutput = new LateArrivingOutputTag(); DataStream input = ... SingleOutputStreamOperator windowed = input .keyBy(...) .window(...) .apply(Function); DataStream lateElements = windowed.getSideOutput(lateElementsOutput); ` Thanks, Chen > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745298#comment-15745298 ] ASF GitHub Bot commented on FLINK-4460: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I had a quick look at the implementation and it looks quite good. I'll look at it in more detail once the 1.2 release is out and then I'll also have more thorough comments. These are some quick comments off the top of my head: - I think we can extend `Collector` with a `collect(OutputTag, T)` method. Then we wouldn't need the extra `RichCollector` and `CollectorWrapper` to work around that. - For `WindowedStream` I would like to have something like this: ``` OutputTag lateElementsOutput = ...; DataStream input = ... SingleOutputStreamOperator windowed = input .keyBy(...) .window(...) .apply(Function, lateElementsOutput); DataStream lateElements = windowed.getSideOutput(lateElementsOutput); ``` or maube something else if we find a better Idea. With the `WindowedStream.tooLateElements()` this would instantiate an extra `WindowOperator` just for getting late elements while another window operator would be responsible for processing the actual elements. That seems wasteful. What do you think? > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)