[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2017-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16233964#comment-16233964
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user XuPingyong commented on the issue:

https://github.com/apache/flink/pull/2110
  
I do not agree with this pr as it always copy  StreamRecord to downstream 
operator.

StreamMap change the input StreamRecord, so this pr works well. But many 
operators do not change/reuse the input StreamRecord, like StreamFlatMap.

The following code no not need the extra copy.
`DataStream input = ...
input
.flatmap(FlatMapFunction...)
.addSink(...);

input
.flatmap(FlatMapFunction...)
​.addSink(...);`

So I think we can change StreamMap to not reuse the input StreamRecord. 
And directly send the StreamRecord if objectReuse is set true.  
What do you think?


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351196#comment-15351196
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2110


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350557#comment-15350557
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
Aw man, I already hat "rename to Test" in my commit but forgot to add that 
...  

Thanks again, @tillrohrmann, I'll make the changes and merge.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346488#comment-15346488
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2110
  
Excellent work @aljoscha. Great fix for the problem and it's also really 
nice that we could get rid of the IT case :-) +1 for merging after addressing 
my minor comments.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346486#comment-15346486
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68241018
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for stream operator chaining behaviour.
+ */
+public class ChainingITCase {
--- End diff --

I think that this test case no longer needs to be an IT case.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> 

[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346484#comment-15346484
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68240257
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -386,4 +402,23 @@ public void close() {
}
}
}
+
+   /**
+* Special version of {@link BroadcastingOutputCollector} that performs 
a shallow copy of the
+* {@link StreamRecord} to ensure that multi-chaining works correctly.
+*/
+   private static final class CopyingBroadcastingOutputCollector 
extends BroadcastingOutputCollector {
+
+   public 
CopyingBroadcastingOutputCollector(Output[] outputs) {
+   super(outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   for (Output output : outputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   output.collect(shallowCopy);
+   }
--- End diff --

Here, the same. I think we could save one copying operation by giving the 
original `record` to the last output.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346476#comment-15346476
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68239970
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Special version of {@link DirectedOutput} that performs a shallow copy 
of the
+ * {@link StreamRecord} to ensure that multi-chaining works correctly.
+ */
+public class CopyingDirectedOutput extends DirectedOutput {
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public CopyingDirectedOutput(
+   List outputSelectors,
+   List, StreamEdge>> 
outputs) {
+   super(outputSelectors, outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   Set> selectedOutputs = 
selectOutputs(record);
+
+   for (Output out : selectedOutputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   out.collect(shallowCopy);
+   }
--- End diff --

Can't we save one copy operation by giving `record` to the last selected 
output?


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346340#comment-15346340
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
@tillrohrmann I pushed a commit that removes the per-operator object-reuse 
setting, refactors broadcasting and direct outputs and changes the ITCase to a 
test case. Happy reviewing.  


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346162#comment-15346162
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
I'll try and come up with something, I'll probably remove the 
`isInputCopyDisabled` from operator and only allow a global setting for object 
reuse. This should simplify things.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346141#comment-15346141
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
Thanks for the thorough review, @tillrohrmann!

Your points are valid, maybe I'll have to change this PR but let me first 
explain my reasoning.

The shallow copy is performed in the one place that all code paths have to 
go through because it is the point right before control is passed to the 
operator. Putting it in different place would mean placing it in 
`BroadcastingOutputCollector`, as you mentioned, as well as in 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java,
 which is used when the user does a split()/select() operation 
(`DataStream.split()`). The number of places where we have to put this might 
evolve in the future.

Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput` 
would mean that we always do two copies per record for the common case of 
having object-copying enabled (which is the default).

About the ITCase. I also don't like having that in there because we are 
approaching the 2h mark on Travis but I think in this case it's valid. This 
test really verifies that the whole system works correctly when the user uses a 
certain feature (I would also add a test for split()/select() now that I 
thought about it). 


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344550#comment-15344550
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2110
  
Thanks for the contribution @wanderingbort and @aljoscha. I might be wrong, 
but maybe there is a slightly better place for the copying operation. 
Furthermore, I think that it would be beneficial if we could test the correct 
behaviour without instantiating an expensive IT case.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344541#comment-15344541
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68077418
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+/**
+ * Tests for stream operator chaining behaviour.
+ */
+public class ChainingITCase extends StreamingMultipleProgramsTestBase {
--- End diff --

Do we have to instantiate an expensive IT test case to test the correct 
chaining behaviour? Maybe we could save some cycles if we don't start a 
complete Flink cluster for that and test directly the chained task.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344537#comment-15344537
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68076977
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -306,8 +306,9 @@ public ChainingOutput(OneInputStreamOperator 
operator) {
@Override
public void collect(StreamRecord record) {
try {
-   operator.setKeyContextElement1(record);
-   operator.processElement(record);
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   operator.setKeyContextElement1(shallowCopy);
+   operator.processElement(shallowCopy);
--- End diff --

Actually I'm wondering whether the `ChainingOutput` is the right place to 
do this copying. Wouldn't it make more sense to do it in the 
`BroadcastingOutputCollector`, because only if we have a branching chained data 
flow we have to make sure that every down stream operator get his own copy of 
the record. For simple chaining it should be correct to reuse the stream record.

So I would adapt the `collect` method of `BroadcastingOutputCollector` the 
following way:

```
public void collect(StreamRecord record) {
for (int i = 0; i < outputs.length - 1; i++) {
StreamRecord shallowCopy = record.copy(record.getValue());
outputs[i].collect(shallowCopy);
}

outputs[outputs.length - 1].collect(record);
}
```


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333422#comment-15333422
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2110

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.

This fix was contributed by @wanderingbort (if this is the right github 
handle) as a patch on the Flink Jira. I can change the commit to attribute it 
to him but so far he didn't respond to my question about this on Jira.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink chaining/fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2110


commit 092f350cccbda32331f527c4eaf7ad3304fa1811
Author: Aljoscha Krettek 
Date:   2016-06-14T10:18:35Z

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.




> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329226#comment-15329226
 ] 

Aljoscha Krettek commented on FLINK-3974:
-

[~wanderingbort] do you want to open a PR with your fix or should I create a 
commit for you with the fix?


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-27 Thread B Wyatt (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304062#comment-15304062
 ] 

B Wyatt commented on FLINK-3974:


In the topologies I've been building object reuse has been a pretty significant 
CPU win.  It would be a shame to lose that capability for topos where care is 
taken to actually maintain the application level requirements of object reuse: 
Don't modify your inputs.

Disabling chaining is a fine option if this is not a bottleneck in your 
topology.  If unchained-splits become the default, I'd like to see the ability 
to chain them remain as an option.  Sometimes splitting into multiple 
chain/threads is good: cpu heavy operators that benefit from parallelism.  
Sometimes maintaining the chain to avoid the de/serialization costs is good 
(cpu light operators with high throughput).

The patch attached essentially gives each collector its own dedicated stream 
record with a reference to the value. You are correct it doesn't solve the 
problems of mutable operator inputs but, that has always been the primary 
concession of object reuse and it doesn't seem like a bad requirement to put on 
the application level.  I don't know that there is a way to avoid the cost of 
cloning *and* protect against operators mutating input in a language like Java.

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-26 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15302127#comment-15302127
 ] 

Stephan Ewen commented on FLINK-3974:
-

Yes, that is a pretty clear bug.

I guess the best workaround for now is to disable the object reuse mode. Object 
reuse does not really work well in the DataStream API streaming right now, it 
works pretty well in the DataSet API.

Another quick workaround is to not chain the two different map functions 
{{.disableChaining()}}.

The solution should be quite straightforward, though:
  - Not chain and "splitting" flows any more. I would actually like that 
solution. For splitting flows, it seems like a good heuristic to start a new 
chain/thread by default.
  - Each collector should use its own dedicated stream record. That would 
circumvent the ClassCast at least, but still be dangerous if the mappers 
actually alter the events.

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-05-25 Thread B Wyatt (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15300642#comment-15300642
 ] 

B Wyatt commented on FLINK-3974:


After attempting to produce a minimal repro, I realized that there needs to be 
an operation between the source and the parallel maps:

{code:java}
DataStream input = ...

input = input
.map(MapFunction...);

input
.map(MapFunction...)
.addSink(...);

input
.map(MapFunction...)
​.addSink(...);
{code}

see attached repro case


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)