[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3484 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106685158 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- That is what I remember as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684096 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162497 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.sideoutput; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * An example that illustrates the use of side outputs. + * + * This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount} + * that has a filter in the tokenizer and only emits some words for counting + * while emitting the other words to a side output. + */ +public class SideOutputExample { + + /** +* We need to create an {@link OutputTag} so that we can reference it when emitting +* data to a side output and also to retrieve the side output stream from an operation. +*/ + static final OutputTag rejectedWordsTag = new OutputTag("rejected") {}; --- End diff -- Here we add a side output but we do nothing to show that it works. Probably we can add a prefix "rejected-" to the record and print it, so that the user can see what the side output does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163045 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); --- End diff -- The `null` should go to the next line for uniformity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162749 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -48,15 +49,25 @@ * output selection). */ private final List selectedNames; + + /** +* The side-output tag (if any) of this {@link StreamEdge}. +*/ + private final OutputTag outputTag; + + /** +* The {@link StreamPartitioner} on this {@link StreamEdge}. +*/ private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List selectedNames, StreamPartitioner outputPartitioner) { + List selectedNames, StreamPartitioner outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; --- End diff -- Does it make sense to add the outputTag also in the `edgeId`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163521 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java --- @@ -63,6 +66,7 @@ private TypeSerializer typeSerializerIn1; private TypeSerializer typeSerializerIn2; private TypeSerializer typeSerializerOut; + private MaptypeSerializerMap; --- End diff -- This is not used anywhere in the code. Can it be removed, along with the `getTypeSerializerOut()` and `setTypeSerializerOut()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139593 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override + protected void pushToOperator(StreamRecord record) { try { + // we know that the given outputTag matches our OutputTag so the record + // must be of the type that our operator (and Serializer) expects. + @SuppressWarnings("unchecked") + StreamRecord castRecord = (StreamRecord) record; + numRecordsIn.inc(); - StreamRecord copy = record.copy(serializer.copy(record.getValue())); + StreamRecord copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); - operator.processElement(copy); - } - catch (Exception e) { + operator.processElement(castRecord); --- End diff -- This should be `copy`, not `castRecord`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138473 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ + public DataStream getSideOutput(OutputTag sideOutputTag){ + sideOutputTag = clean(sideOutputTag); + + TypeInformation type = requestedSideOutputs.get(sideOutputTag); + if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { + throw new UnsupportedOperationException("A side output with a matching id was " + + "already requested with a different type. This is not allowed, side output " + + "ids need to be unique."); + } + + requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); + --- End diff -- The `requireNotNull` should be in the beginning of the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138895 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag}. + * + * This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param The type of the elements that result from this {@code SideOutputTransformation} + */ --- End diff -- Here we do not check if the `input` is `null` (we do it in the caller method only) but we try get the parallelism. We could have the parallelism as a separate argument, and then, after the `super()` check if the input is null. This makes the code of the class self-contained as you do not have to check other classes to see if the `input` can be `null` or not. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139530 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override --- End diff -- This can become `private`, as before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138342 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ + public DataStream getSideOutput(OutputTag sideOutputTag){ + sideOutputTag = clean(sideOutputTag); --- End diff -- I think it is better to not reuse the argument variable but create a new one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139079 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** +* Decide if a record is currently late, based on current watermark and allowed lateness. +* +* @param element The element to check +* @return The element for which should be considered when sideoutputs +*/ + protected boolean isLate(StreamRecord element){ --- End diff -- This is not used any more, right? So it can be deleted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138237 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) +*/ --- End diff -- Missing space between the ) and the { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138062 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- Still the first comment applies: the `equals` can be simplified given that `id != null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139311 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -326,33 +327,46 @@ public int getChainLength() { MapchainedConfigs, ClassLoader userCodeClassloader, Map streamOutputs, - List allOperators) + List allOperators, + OutputTag outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - String taskName) - { - TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + + if (edge.getOutputTag() != null) { + // side output + outSerializer = upStreamConfig.getTypeSerializerSideOut( + edge.getOutputTag(), taskEnvironment.getUserClassLoader()); + } else { + // main output --- End diff -- this can become one line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139449 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,18 +401,49 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override public void collect(StreamRecord record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + --- End diff -- This can become `private` as the copying alternative has its own implementation, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441679 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- I see. The problem is that if this does not work, then we can have important side effects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441456 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- I would have liked to include the `TypeInformation` into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436408 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436193 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override --- End diff -- This I'm fixing, as I mentioned above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105435806 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java --- @@ -72,6 +76,11 @@ public RecordWriterOutput( --- End diff -- I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to other outputs but instead push into the operator or into the network. For the other `Outputs` removing the duplication is not possible because inside the respective `output()` method they call `output()` of another `Output`. They call either with an `OutputTag` or without, so the method body is not actually a duplicate. I did find another bug, though, where `CopyingBroadcastingOutputCollector` in `OperatorChain` was not calling the correct `collect()` method on the downstream `Outputs`. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398836 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -300,6 +303,36 @@ private StreamGraph generateInternal(Listtransformatio } /** +* Transforms a {@code SideOutputTransformation}. +* +* +* For this we create a virtual node in the {@code StreamGraph} that holds the side-output +* {@link org.apache.flink.util.OutputTag}. +* +* @see org.apache.flink.streaming.api.graph.StreamGraphGenerator +*/ + private Collection transformSideOutput(SideOutputTransformation sideOutput) { + StreamTransformation input = sideOutput.getInput(); + Collection resultIds = transform(input); + + + // the recursive transform might have already transformed this + if (alreadyTransformed.containsKey(sideOutput)) { + return alreadyTransformed.get(sideOutput); + } + + List virtualResultIds = new ArrayList<>(); + + for (int inputId : resultIds) { + int virtualId = StreamTransformation.getNewNodeId(); + streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag()); + virtualResultIds.add(virtualId); + } + return virtualResultIds; + } + + --- End diff -- Leave only one empty line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402746 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1528,14 +1572,16 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105409533 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java --- @@ -72,6 +76,11 @@ public RecordWriterOutput( --- End diff -- We can't throw, because that would crash the program. This is a good catch, though! I will remove the duplication. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401808 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -441,26 +491,55 @@ public void close() { private static final class CopyingChainingOutput extends ChainingOutput { private final TypeSerializer serializer; - + public CopyingChainingOutput( OneInputStreamOperatoroperator, TypeSerializer serializer, + OutputTag outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } --- End diff -- Again the two `collect()` have duplicate code (after the casting). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397306 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); --- End diff -- We do not need both lines with the checks. We can just have: `this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403355 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java --- @@ -40,6 +41,12 @@ public void collect(StreamRecord record) { } @Override + public void collect( --- End diff -- The signature fits in one line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105399457 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag}. + * + * This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param The type of the elements that result from this {@code SideOutputTransformation} + */ +public class SideOutputTransformation extends StreamTransformation { + private final StreamTransformation input; --- End diff -- Leave a blank line here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397096 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java --- @@ -46,7 +46,15 @@ public TypeHint() { this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); } - + + /** +* Creates a hint for the generic type in the class signature. +*/ + public TypeHint(Class baseClass, Object instance, int genericParameterPos) { + this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos); + } + + --- End diff -- Remove one of the 2 empty lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397933 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag --- End diff -- Two points: 1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` because we check at the constructor, so this method can be simplified. 2) we never check for uniqueness of the `outputTag.id`. We should do it at the translation. This is also a correctness issue as this may result in undesired sideoutput "collisions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401503 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; - public ChainingOutput(OneInputStreamOperatoroperator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag outputTag; + + public ChainingOutput( + OneInputStreamOperator operator, + StreamStatusProvider streamStatusProvider, + OutputTag outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override --- End diff -- Again the two `collect()` methods have much identical code. We can put that common code in a separate private method and calls this instead of repeating the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402262 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + + @Override + public void collect( + OutputTag outputTag, StreamRecord record) { + for (int i = 0; i < outputs.length - 1; i++) { + Outputoutput = outputs[i]; + + // due to side outputs, StreamRecords of varying types can pass through the broadcasting + // collector so we need to cast + @SuppressWarnings({"unchecked", "rawtypes"}) + StreamRecord shallowCopy = (StreamRecord) record.copy(record.getValue()); + output.collect(shallowCopy); + } + --- End diff -- Duplicate comment with the one inside the loop. Also the "// don't copy for the last output" is not right because we actually create a copy before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403240 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java --- @@ -53,5 +54,11 @@ public void collect(StreamRecord record) { } @Override --- End diff -- The signature can go on the same line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398702 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; + if (outputTag == null) { + outputTag = virtualSideOutputNodes.get(virtualId).f1; + } + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag); + } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } - addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } - addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + --- End diff -- Remove line for uniformity with the `if() ...` above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105400890 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -326,33 +327,48 @@ public int getChainLength() { MapchainedConfigs, ClassLoader userCodeClassloader, Map streamOutputs, - List allOperators) + List allOperators, + OutputTag outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private RecordWriterOutput createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - String taskName) - { - TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + --- End diff -- too much line breaking in the following `if() else()` . We could easily reduce it or even replace it with: ``` TypeSerializer outSerializer = (edge.getOutputTag() != null) ? upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402830 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1618,14 +1664,16 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), - LATENESS); + LATENESS, --- End diff -- wrong alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402616 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1267,14 +1291,16 @@ public void testLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397483 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint typeHint = + new TypeHint(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** +* Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation}. +* +* @param id The id of the created {@code OutputTag}. +* @param typeInfo The {@code TypeInformation} for the side output. +*/ + public OutputTag(String id, TypeInformation typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = --- End diff -- No need for line breaking. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402899 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1702,15 +1754,16 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), - LATENESS); + LATENESS, --- End diff -- wrong alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401947 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -539,5 +625,26 @@ public void collect(StreamRecord record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + --- End diff -- The arguments can go in the same line as the method name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398138 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java --- @@ -17,13 +17,19 @@ package org.apache.flink.streaming.examples.windowing; --- End diff -- Unused imports. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402678 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create(), - LATENESS); + LATENESS, + lateOutputTag); --- End diff -- wrong alignment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397424 --- Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * Example: + * {@code + * OutputTag> info = new OutputTag >("late-data"){}); + * } + * + * @param the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation typeInfo; + + /** +* Creates a new named {@code OutputTag} with the given id. +* +* @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { --- End diff -- No need for line breaking: `TypeHint typeHint = new TypeHint(OutputTag.class, this, 0) {};` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105372676 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- I just added a test for the behaviour with a "weird" `WindowAssigner`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105368360 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- I thought about this again. I think it doesn't hurt to have it because it catches the case when a `WindowAssigner` doesn't assign any windows. In that case an element is also "skipped" but it is not necessarily considered late. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105235710 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- Thanks @kl0u Good catch! I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events. @aljoscha If that is redundant check, we might just remove `isLate`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105225561 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { --- End diff -- @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104997566 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- That sounds right, good catch! Thanks for fixing! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104996349 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- sounds good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104995971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- sounds good to me! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104895136 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- I think the comment is a leftover from copying this code from split/select. For side outputs it can't happen that you have multiple "selects" after one another. Will remove the comment. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880615 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- The method is already called `addVirtualSideOutputNode()`. I'm adjusting the name of the field. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104881269 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- You're right, I'm changing this to simply have two loops. I think you introduced this in the first place, though. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880412 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* Example: +* {@code +* static final OutputTag sideOutputTag = new OutputTag("side-output") {}; +* +* public void flatMap(X value, Collector out) throws Exception { --- End diff -- Fixing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104846393 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java --- @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** +* Gets the {@link DataStream} that contains the elements that are emitted from an operation +* into the side output with the given {@link OutputTag}. +* +* Example: +* {@code +* static final OutputTag sideOutputTag = new OutputTag("side-output") {}; +* +* public void flatMap(X value, Collector out) throws Exception { --- End diff -- Comments seems out of date, I think we already decided to get ride of CollectorWrapper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847407 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, - new ArrayList()); + new ArrayList(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner partitioner, - List outputNames) { - + List outputNames, + OutputTag outputTag) { - if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections --- End diff -- may consider call out this behavior in `getSideOutput` comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847832 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() + && edge.getOutputTag() == null // disable chaining for side outputs --- End diff -- I remember you mentioned latest version side output works with chain or no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847733 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; --- End diff -- Do you think introduce this dependency is good idea or bad idea? Up to you :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847005 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java --- @@ -85,6 +86,7 @@ private Set sources; private Set sinks; private Map> virtualSelectNodes; + private Map > virtualOutputNodes; --- End diff -- We might consider use `addVirtualSideOutputNode` and `virtualSideOutputNodes`. Unless we want to refactor move away from current assumption `operator` to `< ... operator < ... ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3484: [FLINK-4460] Side Outputs in Flink
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3484 [FLINK-4460] Side Outputs in Flink This is a refinement of #2982 by @chenqin. I changed the API a bit, added support for side outputs to `ProcessFunction`, enabled side outputs to work with chaining, added proper Scala API and a Scala API test and added documentation. R: @uce @kl0u and @chenqin for review, please You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3484 commit 1746c7e5981942dfb0e57954f8241a400b699120 Author: Chen QinDate: 2016-10-21T19:38:04Z [FLINK-4460] Add support for side outputs This does not yet allow users to emit to side outputs in user functions. Only operators (StreamOperator) can emit to side outputs. A side output can be retrieved on a SingleOutputStreamOperator. commit 362fcb38abf525e704e97505186923ae77dc167b Author: Aljoscha Krettek Date: 2016-10-21T19:38:04Z [FLINK-4460] Add side outputs for ProcessFunction commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50 Author: Aljoscha Krettek Date: 2017-02-16T13:09:26Z [FLINK-4460] Make chaining work with side outputs commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40 Author: Aljoscha Krettek Date: 2017-02-16T13:41:10Z [FLINK-4460] Expose OutputTag constructor that takes TypeInformation commit 935ff90dadad380db293265cc49d072d764cf510 Author: Chen Qin Date: 2017-03-01T14:36:17Z [FLINK-4460] Provide late-data output for window operations We use side outputs to emit dropped late data. commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd Author: Aljoscha Krettek Date: 2017-03-07T10:06:31Z [FLINK-4460] Add proper side output API for Scala API The Scala side output API uses context bounds to get a TypeInformation for an OutputTag. This also adds a SideOutputITCase for the Scala API. commit 66182b79931016a04286279f5aba63af30442b02 Author: Aljoscha Krettek Date: 2017-03-07T11:06:01Z [FLINK-4460] Add documentation for side outputs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---