[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1577#discussion_r51717612
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 ---
@@ -199,8 +200,10 @@ public void close() {

FetchPartition[] partitions = 
partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
+   final KeyedDeserializationSchema clonedDeserializer 
= InstantiationUtil.clone(deserializer, 
Thread.currentThread().getContextClassLoader());
--- End diff --

This classloader will not necessarily be able to deserialize the object. 
See comment above...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1577


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1577#discussion_r51717545
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
@@ -298,6 +299,21 @@ public static void writeObjectToConfig(Object o, 
Configuration config, String ke
return baos.toByteArray();
}
}
+
+   /**
+* Clones the given serializable object using Java serialization.
+*
+* @param obj Object to clone
+* @param classLoader class loader to use to deserialize the object
+* @param  Type of the object to clone
+* @return Cloned object
+* @throws IOException
+* @throws ClassNotFoundException
+*/
+   public static  T clone(T obj, ClassLoader 
classLoader) throws IOException, ClassNotFoundException {
--- End diff --

I think we can drop the classloader here and simply use the classloader of 
the original object:

`return deserializeObject(serializedObject, 
obj.getClass().getClassLoader());`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1577#discussion_r51718424
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
@@ -298,6 +299,21 @@ public static void writeObjectToConfig(Object o, 
Configuration config, String ke
return baos.toByteArray();
}
}
+
+   /**
+* Clones the given serializable object using Java serialization.
+*
+* @param obj Object to clone
+* @param classLoader class loader to use to deserialize the object
+* @param  Type of the object to clone
+* @return Cloned object
+* @throws IOException
+* @throws ClassNotFoundException
+*/
+   public static  T clone(T obj, ClassLoader 
classLoader) throws IOException, ClassNotFoundException {
--- End diff --

Yes you're right Stephan. Thanks for spotting it :+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1577#issuecomment-179231858
  
Addressed all comments and will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1577

[FLINK-3313] [kafka] Fix concurrent TypeInformationSerializationSchema 
usage in LegacyFetcher

The LegacyFetcher used the given KeyedDeserializationSchema across multiple 
threads even though
it is not thread-safe. This commit fixes the problem by cloning the 
KeyedDeserializationSchema
before giving it to the SimpleConsumerThread.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixLegacyFetcher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1577.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1577


commit 8b46eb1518a46edc518474f4a1bbc9425cc708d9
Author: Till Rohrmann 
Date:   2016-02-02T16:38:21Z

[FLINK-3313] [kafka] Fix TypeInformationSerializationSchema usage in 
LegacyFetcher

The LegacyFetcher used the given KeyedDeserializationSchema across multiple 
threads even though
it is not thread-safe. This commit fixes the problem by cloning the 
KeyedDeserializationSchema
before giving it to the SimpleConsumerThread.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1577#issuecomment-178683735
  
Good catch & fix!
+1 to merge asap (too many tests are failing on this one right now)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1577#discussion_r51597662
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 ---
@@ -199,8 +200,11 @@ public void close() {

FetchPartition[] partitions = 
partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
+   @SuppressWarnings("unchecked")
+   final KeyedDeserializationSchema clonedDeserializer 
= (KeyedDeserializationSchema)SerializationUtils.clone(deserializer);
--- End diff --

Good catch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1577#issuecomment-178685691
  
Thanks for the review. Will merge it once Travis gives green light.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1577#discussion_r51595056
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 ---
@@ -199,8 +200,11 @@ public void close() {

FetchPartition[] partitions = 
partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
+   @SuppressWarnings("unchecked")
+   final KeyedDeserializationSchema clonedDeserializer 
= (KeyedDeserializationSchema)SerializationUtils.clone(deserializer);
--- End diff --

whitespace after cast missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3313] [kafka] Fix concurrent TypeInform...

2016-02-02 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1577#issuecomment-178680229
  
Good catch! :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---