[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1577#discussion_r51717612 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java --- @@ -199,8 +200,10 @@ public void close() { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); + final KeyedDeserializationSchema clonedDeserializer = InstantiationUtil.clone(deserializer, Thread.currentThread().getContextClassLoader()); --- End diff -- This classloader will not necessarily be able to deserialize the object. See comment above... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1577 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1577#discussion_r51717545 --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java --- @@ -298,6 +299,21 @@ public static void writeObjectToConfig(Object o, Configuration config, String ke return baos.toByteArray(); } } + + /** +* Clones the given serializable object using Java serialization. +* +* @param obj Object to clone +* @param classLoader class loader to use to deserialize the object +* @param Type of the object to clone +* @return Cloned object +* @throws IOException +* @throws ClassNotFoundException +*/ + public static T clone(T obj, ClassLoader classLoader) throws IOException, ClassNotFoundException { --- End diff -- I think we can drop the classloader here and simply use the classloader of the original object: `return deserializeObject(serializedObject, obj.getClass().getClassLoader());` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1577#discussion_r51718424 --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java --- @@ -298,6 +299,21 @@ public static void writeObjectToConfig(Object o, Configuration config, String ke return baos.toByteArray(); } } + + /** +* Clones the given serializable object using Java serialization. +* +* @param obj Object to clone +* @param classLoader class loader to use to deserialize the object +* @param Type of the object to clone +* @return Cloned object +* @throws IOException +* @throws ClassNotFoundException +*/ + public static T clone(T obj, ClassLoader classLoader) throws IOException, ClassNotFoundException { --- End diff -- Yes you're right Stephan. Thanks for spotting it :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1577#issuecomment-179231858 Addressed all comments and will merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1577 [FLINK-3313] [kafka] Fix concurrent TypeInformationSerializationSchema usage in LegacyFetcher The LegacyFetcher used the given KeyedDeserializationSchema across multiple threads even though it is not thread-safe. This commit fixes the problem by cloning the KeyedDeserializationSchema before giving it to the SimpleConsumerThread. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixLegacyFetcher Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1577.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1577 commit 8b46eb1518a46edc518474f4a1bbc9425cc708d9 Author: Till RohrmannDate: 2016-02-02T16:38:21Z [FLINK-3313] [kafka] Fix TypeInformationSerializationSchema usage in LegacyFetcher The LegacyFetcher used the given KeyedDeserializationSchema across multiple threads even though it is not thread-safe. This commit fixes the problem by cloning the KeyedDeserializationSchema before giving it to the SimpleConsumerThread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1577#issuecomment-178683735 Good catch & fix! +1 to merge asap (too many tests are failing on this one right now) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1577#discussion_r51597662 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java --- @@ -199,8 +200,11 @@ public void close() { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); + @SuppressWarnings("unchecked") + final KeyedDeserializationSchema clonedDeserializer = (KeyedDeserializationSchema)SerializationUtils.clone(deserializer); --- End diff -- Good catch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1577#issuecomment-178685691 Thanks for the review. Will merge it once Travis gives green light. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1577#discussion_r51595056 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java --- @@ -199,8 +200,11 @@ public void close() { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); + @SuppressWarnings("unchecked") + final KeyedDeserializationSchema clonedDeserializer = (KeyedDeserializationSchema)SerializationUtils.clone(deserializer); --- End diff -- whitespace after cast missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1577#issuecomment-178680229 Good catch! :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---