Repository: flink
Updated Branches:
  refs/heads/master fe0c3b539 -> 2eb2a0ef3


[FLINK-3338] [kafka] Use proper classloader when cloning the deserialization 
schema.

This closes #1590


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2eb2a0ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2eb2a0ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2eb2a0ef

Branch: refs/heads/master
Commit: 2eb2a0ef352f75a65a45a5a247450ae61ae5ab17
Parents: fe0c3b5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 4 21:14:39 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 5 14:07:42 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/util/InstantiationUtil.java    | 29 ++++++++++++++++++--
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  3 +-
 .../kafka/internals/LegacyFetcher.java          | 26 ++++++++++++------
 .../kafka/testutils/MockRuntimeContext.java     |  2 +-
 4 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 1c6896f..e2439ca 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -310,8 +310,33 @@ public final class InstantiationUtil {
         * @throws ClassNotFoundException
         */
        public static <T extends Serializable> T clone(T obj) throws 
IOException, ClassNotFoundException {
-               final byte[] serializedObject = serializeObject(obj);
-               return deserializeObject(serializedObject, 
obj.getClass().getClassLoader());
+               if (obj == null) {
+                       return null;
+               } else {
+                       return clone(obj, obj.getClass().getClassLoader());
+               }
+       }
+
+       /**
+        * Clones the given serializable object using Java serialization, using 
the given classloader to
+        * resolve the cloned classes.
+        *
+        * @param obj Object to clone
+        * @param classLoader The classloader to resolve the classes during 
deserialization.
+        * @param <T> Type of the object to clone
+        * 
+        * @return Cloned object
+        * 
+        * @throws IOException
+        * @throws ClassNotFoundException
+        */
+       public static <T extends Serializable> T clone(T obj, ClassLoader 
classLoader) throws IOException, ClassNotFoundException {
+               if (obj == null) {
+                       return null;
+               } else {
+                       final byte[] serializedObject = serializeObject(obj);
+                       return deserializeObject(serializedObject, classLoader);
+               }
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 543e0ff..bdea37f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -251,7 +251,8 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                }
                
                // create fetcher
-               fetcher = new LegacyFetcher(this.subscribedPartitions, props, 
getRuntimeContext().getTaskName());
+               fetcher = new LegacyFetcher(this.subscribedPartitions, props, 
+                               getRuntimeContext().getTaskName(), 
getRuntimeContext().getUserCodeClassLoader());
 
                // offset handling
                offsetHandler = new ZookeeperOffsetHandler(props);

http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 164cbac..fe7f777 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -31,11 +31,12 @@ import kafka.message.MessageAndOffset;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.StringUtils;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 
 /**
  * This fetcher uses Kafka's low-level API to pull data from a specific
@@ -70,6 +71,9 @@ public class LegacyFetcher implements Fetcher {
        
        /** The first error that occurred in a connection thread */
        private final AtomicReference<Throwable> error;
+       
+       /** The classloader for dynamically loaded classes */
+       private final ClassLoader userCodeClassloader;
 
        /** The partitions that the fetcher should read, with their starting 
offsets */
        private Map<KafkaTopicPartitionLeader, Long> partitionsToRead;
@@ -86,8 +90,13 @@ public class LegacyFetcher implements Fetcher {
        /** Flag to shot the fetcher down */
        private volatile boolean running = true;
 
-       public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, 
Properties props, String taskName) {
-               this.config = checkNotNull(props, "The config properties cannot 
be null");
+       public LegacyFetcher(
+                               List<KafkaTopicPartitionLeader> partitions, 
Properties props,
+                               String taskName, ClassLoader 
userCodeClassloader) {
+               
+               this.config = requireNonNull(props, "The config properties 
cannot be null");
+               this.userCodeClassloader = requireNonNull(userCodeClassloader);
+               
                //this.topic = checkNotNull(topic, "The topic cannot be null");
                this.partitionsToRead = new HashMap<>();
                for (KafkaTopicPartitionLeader p: partitions) {
@@ -200,7 +209,8 @@ public class LegacyFetcher implements Fetcher {
                        
                        FetchPartition[] partitions = 
partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
-                       final KeyedDeserializationSchema<T> clonedDeserializer 
= InstantiationUtil.clone(deserializer);
+                       final KeyedDeserializationSchema<T> clonedDeserializer =
+                                       InstantiationUtil.clone(deserializer, 
userCodeClassloader);
 
                        SimpleConsumerThread<T> thread = new 
SimpleConsumerThread<>(this, config,
                                        broker, partitions, sourceContext, 
clonedDeserializer, lastOffsets);
@@ -344,9 +354,9 @@ public class LegacyFetcher implements Fetcher {
                        this.config = config;
                        this.broker = broker;
                        this.partitions = partitions;
-                       this.sourceContext = checkNotNull(sourceContext);
-                       this.deserializer = checkNotNull(deserializer);
-                       this.offsetsState = checkNotNull(offsetsState);
+                       this.sourceContext = requireNonNull(sourceContext);
+                       this.deserializer = requireNonNull(deserializer);
+                       this.offsetsState = requireNonNull(offsetsState);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2eb2a0ef/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index cd44236..17e2e6f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -98,7 +98,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
 
        @Override
        public ClassLoader getUserCodeClassLoader() {
-               throw new UnsupportedOperationException();
+               return getClass().getClassLoader();
        }
 
        @Override

Reply via email to