[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16233964#comment-16233964 ] ASF GitHub Bot commented on FLINK-3974: --- Github user XuPingyong commented on the issue: https://github.com/apache/flink/pull/2110 I do not agree with this pr as it always copy StreamRecord to downstream operator. StreamMap change the input StreamRecord, so this pr works well. But many operators do not change/reuse the input StreamRecord, like StreamFlatMap. The following code no not need the extra copy. `DataStream input = ... input .flatmap(FlatMapFunction...) .addSink(...); input .flatmap(FlatMapFunction...) .addSink(...);` So I think we can change StreamMap to not reuse the input StreamRecord. And directly send the StreamRecord if objectReuse is set true. What do you think? > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.1.0 > > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351196#comment-15351196 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2110 > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350557#comment-15350557 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 Aw man, I already hat "rename to Test" in my commit but forgot to add that ... Thanks again, @tillrohrmann, I'll make the changes and merge. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346488#comment-15346488 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2110 Excellent work @aljoscha. Great fix for the problem and it's also really nice that we could get rid of the IT case :-) +1 for merging after addressing my minor comments. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346486#comment-15346486 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68241018 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests for stream operator chaining behaviour. + */ +public class ChainingITCase { --- End diff -- I think that this test case no longer needs to be an IT case. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the >
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346484#comment-15346484 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68240257 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -386,4 +402,23 @@ public void close() { } } } + + /** +* Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the +* {@link StreamRecord} to ensure that multi-chaining works correctly. +*/ + private static final class CopyingBroadcastingOutputCollector extends BroadcastingOutputCollector { + + public CopyingBroadcastingOutputCollector(Output[] outputs) { + super(outputs); + } + + @Override + public void collect(StreamRecord record) { + for (Output output : outputs) { + StreamRecord shallowCopy = record.copy(record.getValue()); + output.collect(shallowCopy); + } --- End diff -- Here, the same. I think we could save one copying operation by giving the original `record` to the last output. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output .collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346476#comment-15346476 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68239970 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; +import java.util.Set; + + +/** + * Special version of {@link DirectedOutput} that performs a shallow copy of the + * {@link StreamRecord} to ensure that multi-chaining works correctly. + */ +public class CopyingDirectedOutput extends DirectedOutput { + + @SuppressWarnings({"unchecked", "rawtypes"}) + public CopyingDirectedOutput( + ListoutputSelectors, + List , StreamEdge>> outputs) { + super(outputSelectors, outputs); + } + + @Override + public void collect(StreamRecord record) { + Set
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346340#comment-15346340 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 @tillrohrmann I pushed a commit that removes the per-operator object-reuse setting, refactors broadcasting and direct outputs and changes the ITCase to a test case. Happy reviewing. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346162#comment-15346162 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 I'll try and come up with something, I'll probably remove the `isInputCopyDisabled` from operator and only allow a global setting for object reuse. This should simplify things. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346141#comment-15346141 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 Thanks for the thorough review, @tillrohrmann! Your points are valid, maybe I'll have to change this PR but let me first explain my reasoning. The shallow copy is performed in the one place that all code paths have to go through because it is the point right before control is passed to the operator. Putting it in different place would mean placing it in `BroadcastingOutputCollector`, as you mentioned, as well as in https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java, which is used when the user does a split()/select() operation (`DataStream.split()`). The number of places where we have to put this might evolve in the future. Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput` would mean that we always do two copies per record for the common case of having object-copying enabled (which is the default). About the ITCase. I also don't like having that in there because we are approaching the 2h mark on Travis but I think in this case it's valid. This test really verifies that the whole system works correctly when the user uses a certain feature (I would also add a test for split()/select() now that I thought about it). > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344550#comment-15344550 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2110 Thanks for the contribution @wanderingbort and @aljoscha. I might be wrong, but maybe there is a slightly better place for the copying operation. Furthermore, I think that it would be beneficial if we could test the correct behaviour without instantiating an expensive IT case. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344541#comment-15344541 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68077418 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +/** + * Tests for stream operator chaining behaviour. + */ +public class ChainingITCase extends StreamingMultipleProgramsTestBase { --- End diff -- Do we have to instantiate an expensive IT test case to test the correct chaining behaviour? Maybe we could save some cycles if we don't start a complete Flink cluster for that and test directly the chained task. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344537#comment-15344537 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68076977 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -306,8 +306,9 @@ public ChainingOutput(OneInputStreamOperatoroperator) { @Override public void collect(StreamRecord record) { try { - operator.setKeyContextElement1(record); - operator.processElement(record); + StreamRecord shallowCopy = record.copy(record.getValue()); + operator.setKeyContextElement1(shallowCopy); + operator.processElement(shallowCopy); --- End diff -- Actually I'm wondering whether the `ChainingOutput` is the right place to do this copying. Wouldn't it make more sense to do it in the `BroadcastingOutputCollector`, because only if we have a branching chained data flow we have to make sure that every down stream operator get his own copy of the record. For simple chaining it should be correct to reuse the stream record. So I would adapt the `collect` method of `BroadcastingOutputCollector` the following way: ``` public void collect(StreamRecord record) { for (int i = 0; i < outputs.length - 1; i++) { StreamRecord shallowCopy = record.copy(record.getValue()); outputs[i].collect(shallowCopy); } outputs[outputs.length - 1].collect(record); } ``` > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output .collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333422#comment-15333422 ] ASF GitHub Bot commented on FLINK-3974: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2110 [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. This fix was contributed by @wanderingbort (if this is the right github handle) as a patch on the Flink Jira. I can change the commit to attribute it to him but so far he didn't respond to my question about this on Jira. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink chaining/fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2110 commit 092f350cccbda32331f527c4eaf7ad3304fa1811 Author: Aljoscha KrettekDate: 2016-06-14T10:18:35Z [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output .collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329226#comment-15329226 ] Aljoscha Krettek commented on FLINK-3974: - [~wanderingbort] do you want to open a PR with your fix or should I create a commit for you with the fix? > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304062#comment-15304062 ] B Wyatt commented on FLINK-3974: In the topologies I've been building object reuse has been a pretty significant CPU win. It would be a shame to lose that capability for topos where care is taken to actually maintain the application level requirements of object reuse: Don't modify your inputs. Disabling chaining is a fine option if this is not a bottleneck in your topology. If unchained-splits become the default, I'd like to see the ability to chain them remain as an option. Sometimes splitting into multiple chain/threads is good: cpu heavy operators that benefit from parallelism. Sometimes maintaining the chain to avoid the de/serialization costs is good (cpu light operators with high throughput). The patch attached essentially gives each collector its own dedicated stream record with a reference to the value. You are correct it doesn't solve the problems of mutable operator inputs but, that has always been the primary concession of object reuse and it doesn't seem like a bad requirement to put on the application level. I don't know that there is a way to avoid the cost of cloning *and* protect against operators mutating input in a language like Java. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302127#comment-15302127 ] Stephan Ewen commented on FLINK-3974: - Yes, that is a pretty clear bug. I guess the best workaround for now is to disable the object reuse mode. Object reuse does not really work well in the DataStream API streaming right now, it works pretty well in the DataSet API. Another quick workaround is to not chain the two different map functions {{.disableChaining()}}. The solution should be quite straightforward, though: - Not chain and "splitting" flows any more. I would actually like that solution. For splitting flows, it seems like a good heuristic to start a new chain/thread by default. - Each collector should use its own dedicated stream record. That would circumvent the ClassCast at least, but still be dangerous if the mappers actually alter the events. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300642#comment-15300642 ] B Wyatt commented on FLINK-3974: After attempting to produce a minimal repro, I realized that there needs to be an operation between the source and the parallel maps: {code:java} DataStream input = ... input = input .map(MapFunction...); input .map(MapFunction...) .addSink(...); input .map(MapFunction...) .addSink(...); {code} see attached repro case > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)