[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334256#comment-17334256 ] Flink Jira Bot commented on FLINK-6857: --- This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / Configuration >Reporter: Tzu-Li (Gordon) Tai >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available, stale-assigned > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323643#comment-17323643 ] Flink Jira Bot commented on FLINK-6857: --- This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System, Runtime / Configuration >Reporter: Tzu-Li (Gordon) Tai >Assignee: zhangminglei >Priority: Major > Labels: pull-request-available, stale-assigned > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664270#comment-16664270 ] ASF GitHub Bot commented on FLINK-6857: --- zentol closed pull request #4166: [FLINK-6857] [types] Add global default Kryo serializer configuration… URL: https://github.com/apache/flink/pull/4166 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 26e6af1e945..1b02176fe5d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -18,7 +18,9 @@ package org.apache.flink.api.common; +import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; + import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -146,6 +148,8 @@ */ private long taskCancellationTimeoutMillis = -1; + private Kryo kryo = new Kryo(); + // --- User code values private GlobalJobParameters globalJobParameters; @@ -678,7 +682,8 @@ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) { * Adds a new Kryo default serializer to the Runtime. * * Note that the serializer instance must be serializable (as defined by java.io.Serializable), -* because it may be distributed to the worker nodes by java serialization. +* because it may be distributed to the worker nodes by java serialization. Also, this method +* can only tied to specific class which correspond to the addDefaultSerializer method in Kryo. * * @param type The class of the types serialized with the given serializer. * @param serializer The serializer to use. @@ -694,6 +699,9 @@ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) { /** * Adds a new Kryo default serializer to the Runtime. * +* Note that this method can only tied to specific class which correspond +* to the addDefaultSerializer method in Kryo. +* * @param type The class of the types serialized with the given serializer. * @param serializerClass The class of the serializer to use. */ @@ -704,6 +712,25 @@ public void addDefaultKryoSerializer(Class type, Class type, Class> serializerClass) { + if (type == null || serializerClass == null) { + throw new NullPointerException("Cannot register null class or serializer."); + } + kryo.newInstance(type); + kryo.setDefaultSerializer(serializerClass); + defaultKryoSerializerClasses.put(type, serializerClass); + } + /** * Registers the given type with a Kryo Serializer. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 46c821edfa2..b4f328a8907 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -559,6 +559,22 @@ public void addDefaultKryoSerializer(Class type, ClassNote that this method is different from {@link #addDefaultKryoSerializer(Class, Class)}, +* you can specify your own serializer class to use when no {@link #addDefaultKryoSerializer(Class, Class) +* default serializers} match an object's type. +* +* @param type +* The class of the types serialized with the given serializer. +* @param serializerClass +* The class of the serializer to use. +*/ + public void setDefaultKryoSerializer(Class type, Class> serializerClass) { + config.setDefaultKryoSerializer(type, serializerClass); + } + /** * Registers the given type with a Kryo Serializer. * This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087148#comment-16087148 ] ASF GitHub Bot commented on FLINK-6857: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4166 @tzulitai Sorry for delay. What you mean is that I should first implement or support ```Kryo.setDefaultSerializer(...)``` method in ```ExecutionConfig```, and that is the solution for this issue. I understand right ? If I am wrong, please helps me out. Thank you ! > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060515#comment-16060515 ] ASF GitHub Bot commented on FLINK-6857: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4166 @zhangminglei this does not solve the issue. The expected fix is to pass the new setting to `ExecutionConfig`, which will then be used when creating a `KryoSerializer` (see `GenericTypeInfo`). The `KryoSerializer` needs to respect this setting and appropriately configure the `kryo` instance when instantiating it. > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059211#comment-16059211 ] mingleizhang commented on FLINK-6857: - Hi, [~tzulitai] I have gave a PR to this issue. Could you take a review ? :D > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059208#comment-16059208 ] ASF GitHub Bot commented on FLINK-6857: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4166 [FLINK-6857] [types] Add global default Kryo serializer configuration… … to StreamExecutionEnvironment Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6857 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4166 commit f8fbab7640c0713487de6eefa01e03597aea47cc Author: zhangmingleiDate: 2017-06-22T11:28:39Z [FLINK-6857] [types] Add global default Kryo serializer configuration to StreamExecutionEnvironment > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042866#comment-16042866 ] Tzu-Li (Gordon) Tai commented on FLINK-6857: The JIRA would therefore also entail adding the configuration to the {{ExecutionConfig}} > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042864#comment-16042864 ] Tzu-Li (Gordon) Tai commented on FLINK-6857: [~StephanEwen] no, the current {{addDefaultKryoSerializer}} methods are defaults tied to specific classes, and correspond to the {{addDefaultSerializer(Class, Serializer)}} methods in Kryo. This JIRA is meant to add a {{setDefaultSerializer}} configuration, which allows overriding the hard default (i.e., when no registered defaults for a class can be found), which in Kryo is the {{FieldSerializer}}. This would correspond to Kryo's {{setDefaultSerializer(Serializer)}} API. > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16042855#comment-16042855 ] Stephan Ewen commented on FLINK-6857: - This exists in the {{ExecutionConfig}}, which is per {{StreamExecutionEnvironment}}. https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java#L686 Does that cover this case? > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16039960#comment-16039960 ] mingleizhang commented on FLINK-6857: - +1 :) > Add global default Kryo serializer configuration to StreamExecutionEnvironment > -- > > Key: FLINK-6857 > URL: https://issues.apache.org/jira/browse/FLINK-6857 > Project: Flink > Issue Type: Improvement > Components: Configuration, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai > > See ML for original discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html. > We should have an additional {{setDefaultKryoSerializer}} method that allows > overriding the global default serializer that is not tied to specific classes > (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default > serializer settings can be found for a class). Internally in Flink's > {{KryoSerializer}}, this would only be a matter of proxying that configured > global default serializer for Kryo by calling > {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance. -- This message was sent by Atlassian JIRA (v6.3.15#6346)