[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410544#comment-17410544 ] Guozao Meng commented on FLINK-12113: - I change `Iterator` to normal `ArryList` or other `Collection` worked, it is the `Iterator` cause this error. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17333965#comment-17333965 ] Flink Jira Bot commented on FLINK-12113: This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Assignee: Guowei Ma >Priority: Major > Labels: stale-assigned > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323316#comment-17323316 ] Flink Jira Bot commented on FLINK-12113: This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Assignee: Guowei Ma >Priority: Major > Labels: stale-assigned > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071758#comment-17071758 ] bruce qi commented on FLINK-12113: --- yeah ,we now find the same problem occurs again in StreamExecutionEnvironment,so is this problem solved? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Assignee: Guowei Ma >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824898#comment-16824898 ] aitozi commented on FLINK-12113: Can we enable the recursive clean in ClosureCleaner#clean by check the class fields recursively. In this way we don't have to take care of all the wrapper functions, we just have to clean up to all userfunction register entrance. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Assignee: Guowei Ma >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16816008#comment-16816008 ] yankai zhang commented on FLINK-12113: -- I'm not quite familiar with flink project development, maybe you can help fix this, thx. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815286#comment-16815286 ] Guowei Ma commented on FLINK-12113: --- [~vision57] do you want to fix this ? if I you would not have time I think I could fix it. What do you think? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813050#comment-16813050 ] Guowei Ma commented on FLINK-12113: --- Thanks for your explanation. [~vision57]. DataStream.flatmap/DataStream.process do the clean but StreamExecutionEnvironment does not do the clean. I think we could learn from DataStream api. [~aljoscha] Could you give some advice? > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812967#comment-16812967 ] yankai zhang commented on FLINK-12113: -- Yes, _fromCollection(Iterator, Class)_ works well as expected without anonymous class. Problem here is anonymous class object in instance method implicitly references outer _this_(but not actually used), while outer _this_ is not serializable, and this is exactly what _StreamExecutionEnvironment#clean_ supposed to do. In act, the iterator passed by user is wrapped within a _FromIteratorFunction_, and then _StreamExecutionEnvironment#clean_ is called on that wrapper __ instance, not the iterator itself. However current implementation of _StreamExecutionEnvironment#clean_ is not recursive, it can't find and clean _this_ deeply nested in closure. Here is my fully reproducible code: {code:java} public class MainTest { interface IS extends Iterator, Serializable { } @Test public void cleanTest() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(new IS() { @Override public boolean hasNext() { return false; } @Override public Object next() { return null; } }, Object.class); } }{code} > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812502#comment-16812502 ] Guowei Ma commented on FLINK-12113: --- hi, [~vision57] I change my code and don't use anonymous class. But it does not produce the problem. I am using JDK8. !image-2019-04-08-23-19-27-359.png! > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png, > image-2019-04-08-23-19-27-359.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812049#comment-16812049 ] yankai zhang commented on FLINK-12113: -- Interesting. I guess maybe java has some optimizing to make your anonymous class instance static, so you don't have reference to outer _this_. I find [an explaination on stackoverflow|[https://stackoverflow.com/a/758616/4281058].] Actually there is no outer _this_ in your case, you can try putting your code into an instance method instead of static main. > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12113) User code passing to fromCollection(Iterator, Class) not cleaned
[ https://issues.apache.org/jira/browse/FLINK-12113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811892#comment-16811892 ] Guowei Ma commented on FLINK-12113: --- I can't reproduce your problem. !image-2019-04-07-21-52-37-264.png! > User code passing to fromCollection(Iterator, Class) not cleaned > > > Key: FLINK-12113 > URL: https://issues.apache.org/jira/browse/FLINK-12113 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 >Reporter: yankai zhang >Priority: Major > Attachments: image-2019-04-07-21-52-37-264.png > > > > {code:java} > interface IS extends Iterator, Serializable { } > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }, Object.class); > {code} > Code piece above throws exception: > {code:java} > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > {code} > And my workaround is wrapping clean around iterator instance, like this: > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(env.clean(new IS() { > @Override > public boolean hasNext() { > return false; > } > @Override > public Object next() { > return null; > } > }), Object.class); > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)